• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188d7c9-fb87-419e-b893-61f23adef50c

20 Jun 2023 07:51AM UTC coverage: 57.299% (+0.04%) from 57.263%
0188d7c9-fb87-419e-b893-61f23adef50c

push

buildkite

web-flow
Show total number workflows while waiting for the listall command to … (#5309)

* Show total number workflows while waiting for the listall command to complete

* Update error message unable to count workflows

* Add missing method calls in TestWorkflowList

* Fix missing method calls in TestWorkflowList

* Update mock based method ListClosedWorkflowExecutions

* Add expected method calls

* Add expected method calls

* Fix TestListArchivedWorkflow test

14 of 14 new or added lines in 1 file covered. (100.0%)

87125 of 152053 relevant lines covered (57.3%)

2485.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.62
/tools/cli/workflowCommands.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package cli
23

24
import (
25
        "bufio"
26
        "context"
27
        "encoding/json"
28
        "errors"
29
        "fmt"
30
        "io/ioutil"
31
        "math"
32
        "math/rand"
33
        "os"
34
        "regexp"
35
        "strconv"
36
        "strings"
37
        "sync"
38
        "time"
39

40
        "github.com/olekukonko/tablewriter"
41
        "github.com/pborman/uuid"
42
        "github.com/urfave/cli"
43

44
        "github.com/uber/cadence/client/frontend"
45
        "github.com/uber/cadence/common"
46
        "github.com/uber/cadence/common/clock"
47
        "github.com/uber/cadence/common/types"
48
        "github.com/uber/cadence/service/history/execution"
49
)
50

51
// RestartWorkflow restarts a workflow execution
52
func RestartWorkflow(c *cli.Context) {
2✔
53
        wfClient := getWorkflowClient(c)
2✔
54

2✔
55
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
56
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
57
        rid := c.String(FlagRunID)
2✔
58

2✔
59
        ctx, cancel := newContext(c)
2✔
60
        defer cancel()
2✔
61
        resp, err := wfClient.RestartWorkflowExecution(
2✔
62
                ctx,
2✔
63
                &types.RestartWorkflowExecutionRequest{
2✔
64
                        Domain: domain,
2✔
65
                        WorkflowExecution: &types.WorkflowExecution{
2✔
66
                                WorkflowID: wid,
2✔
67
                                RunID:      rid,
2✔
68
                        }, Identity: getCliIdentity(),
2✔
69
                },
2✔
70
        )
2✔
71

2✔
72
        if err != nil {
3✔
73
                ErrorAndExit("Restart workflow failed.", err)
1✔
74
        } else {
2✔
75
                fmt.Printf("Restarted Workflow Id: %s, run Id: %s\n", wid, resp.GetRunID())
1✔
76
        }
1✔
77
}
78

79
// ShowHistory shows the history of given workflow execution based on workflowID and runID.
80
func ShowHistory(c *cli.Context) {
3✔
81
        wid := getRequiredOption(c, FlagWorkflowID)
3✔
82
        rid := c.String(FlagRunID)
3✔
83
        showHistoryHelper(c, wid, rid)
3✔
84
}
3✔
85

86
// ShowHistoryWithWID shows the history of given workflow with workflow_id
87
func ShowHistoryWithWID(c *cli.Context) {
1✔
88
        if !c.Args().Present() {
1✔
89
                ErrorAndExit("Argument workflow_id is required.", nil)
×
90
        }
×
91
        wid := c.Args().First()
1✔
92
        rid := ""
1✔
93
        if c.NArg() >= 2 {
1✔
94
                rid = c.Args().Get(1)
×
95
        }
×
96
        showHistoryHelper(c, wid, rid)
1✔
97
}
98

99
func showHistoryHelper(c *cli.Context, wid, rid string) {
4✔
100
        wfClient := getWorkflowClient(c)
4✔
101

4✔
102
        domain := getRequiredGlobalOption(c, FlagDomain)
4✔
103
        printDateTime := c.Bool(FlagPrintDateTime)
4✔
104
        printRawTime := c.Bool(FlagPrintRawTime)
4✔
105
        printFully := c.Bool(FlagPrintFullyDetail)
4✔
106
        printVersion := c.Bool(FlagPrintEventVersion)
4✔
107
        outputFileName := c.String(FlagOutputFilename)
4✔
108
        var maxFieldLength int
4✔
109
        if c.IsSet(FlagMaxFieldLength) || !printFully {
8✔
110
                maxFieldLength = c.Int(FlagMaxFieldLength)
4✔
111
        }
4✔
112
        resetPointsOnly := c.Bool(FlagResetPointsOnly)
4✔
113

4✔
114
        ctx, cancel := newContext(c)
4✔
115
        defer cancel()
4✔
116
        history, err := GetHistory(ctx, wfClient, domain, wid, rid)
4✔
117
        if err != nil {
4✔
118
                ErrorAndExit(fmt.Sprintf("Failed to get history on workflow id: %s, run id: %s.", wid, rid), err)
×
119
        }
×
120

121
        prevEvent := types.HistoryEvent{}
4✔
122
        if printFully { // dump everything
4✔
123
                for _, e := range history.Events {
×
124
                        if resetPointsOnly {
×
125
                                if prevEvent.GetEventType() != types.EventTypeDecisionTaskStarted {
×
126
                                        prevEvent = *e
×
127
                                        continue
×
128
                                }
129
                                prevEvent = *e
×
130
                        }
131
                        fmt.Println(anyToString(e, true, maxFieldLength))
×
132
                }
133
        } else if c.IsSet(FlagEventID) { // only dump that event
4✔
134
                eventID := c.Int(FlagEventID)
×
135
                if eventID <= 0 || eventID > len(history.Events) {
×
136
                        ErrorAndExit("EventId out of range.", fmt.Errorf("number should be 1 - %d inclusive", len(history.Events)))
×
137
                }
×
138
                e := history.Events[eventID-1]
×
139
                fmt.Println(anyToString(e, true, 0))
×
140
        } else { // use table to pretty output, will trim long text
4✔
141
                table := tablewriter.NewWriter(os.Stdout)
4✔
142
                table.SetBorder(false)
4✔
143
                table.SetColumnSeparator("")
4✔
144
                for _, e := range history.Events {
8✔
145
                        if resetPointsOnly {
4✔
146
                                if prevEvent.GetEventType() != types.EventTypeDecisionTaskStarted {
×
147
                                        prevEvent = *e
×
148
                                        continue
×
149
                                }
150
                                prevEvent = *e
×
151
                        }
152

153
                        columns := []string{}
4✔
154
                        columns = append(columns, strconv.FormatInt(e.ID, 10))
4✔
155

4✔
156
                        if printRawTime {
5✔
157
                                columns = append(columns, strconv.FormatInt(e.GetTimestamp(), 10))
1✔
158
                        } else if printDateTime {
5✔
159
                                columns = append(columns, convertTime(e.GetTimestamp(), false))
1✔
160
                        }
1✔
161
                        if printVersion {
4✔
162
                                columns = append(columns, fmt.Sprintf("(Version: %v)", e.Version))
×
163
                        }
×
164

165
                        columns = append(columns, ColorEvent(e), HistoryEventToString(e, false, maxFieldLength))
4✔
166
                        table.Append(columns)
4✔
167
                }
168
                table.Render()
4✔
169
        }
170

171
        if outputFileName != "" {
4✔
172
                serializer := &JSONHistorySerializer{}
×
173
                data, err := serializer.Serialize(history)
×
174
                if err != nil {
×
175
                        ErrorAndExit("Failed to serialize history data.", err)
×
176
                }
×
177
                if err := ioutil.WriteFile(outputFileName, data, 0666); err != nil {
×
178
                        ErrorAndExit("Failed to export history data file.", err)
×
179
                }
×
180
        }
181

182
        // finally append activities with retry
183
        frontendClient := cFactory.ServerFrontendClient(c)
4✔
184
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
4✔
185
                Domain: domain,
4✔
186
                Execution: &types.WorkflowExecution{
4✔
187
                        WorkflowID: wid,
4✔
188
                        RunID:      rid,
4✔
189
                },
4✔
190
        })
4✔
191
        if err != nil {
4✔
192
                if _, ok := err.(*types.EntityNotExistsError); ok {
×
193
                        fmt.Printf("%s %s\n", colorRed("Error:"), err)
×
194
                        return
×
195
                }
×
196
                ErrorAndExit("Describe workflow execution failed, cannot get information of pending activities", err)
×
197
        }
198
        fmt.Println("History Source: Default Storage")
4✔
199

4✔
200
        descOutput := convertDescribeWorkflowExecutionResponse(resp, frontendClient, c)
4✔
201
        if len(descOutput.PendingActivities) > 0 {
4✔
202
                fmt.Println("============Workflow Pending activities============")
×
203
                prettyPrintJSONObject(descOutput.PendingActivities)
×
204
                fmt.Println("NOTE: ActivityStartedEvent with retry policy will be written into history when the activity is finished.")
×
205
        }
×
206

207
}
208

209
// StartWorkflow starts a new workflow execution
210
func StartWorkflow(c *cli.Context) {
3✔
211
        startWorkflowHelper(c, false)
3✔
212
}
3✔
213

214
// RunWorkflow starts a new workflow execution and print workflow progress and result
215
func RunWorkflow(c *cli.Context) {
3✔
216
        startWorkflowHelper(c, true)
3✔
217
}
3✔
218

219
func startWorkflowHelper(c *cli.Context, shouldPrintProgress bool) {
6✔
220
        serviceClient := cFactory.ServerFrontendClient(c)
6✔
221

6✔
222
        startRequest := constructStartWorkflowRequest(c)
6✔
223
        domain := startRequest.GetDomain()
6✔
224
        wid := startRequest.GetWorkflowID()
6✔
225
        workflowType := startRequest.WorkflowType.GetName()
6✔
226
        taskList := startRequest.TaskList.GetName()
6✔
227
        input := string(startRequest.Input)
6✔
228

6✔
229
        startFn := func() {
9✔
230
                tcCtx, cancel := newContext(c)
3✔
231
                defer cancel()
3✔
232
                resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
3✔
233

3✔
234
                if err != nil {
4✔
235
                        ErrorAndExit("Failed to create workflow.", err)
1✔
236
                } else {
3✔
237
                        fmt.Printf("Started Workflow Id: %s, run Id: %s\n", wid, resp.GetRunID())
2✔
238
                }
2✔
239
        }
240

241
        runFn := func() {
9✔
242
                tcCtx, cancel := newContextForLongPoll(c)
3✔
243
                defer cancel()
3✔
244
                resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
3✔
245

3✔
246
                if err != nil {
4✔
247
                        ErrorAndExit("Failed to run workflow.", err)
1✔
248
                }
1✔
249

250
                // print execution summary
251
                fmt.Println(colorMagenta("Running execution:"))
3✔
252
                table := tablewriter.NewWriter(os.Stdout)
3✔
253
                executionData := [][]string{
3✔
254
                        {"Workflow Id", wid},
3✔
255
                        {"Run Id", resp.GetRunID()},
3✔
256
                        {"Type", workflowType},
3✔
257
                        {"Domain", domain},
3✔
258
                        {"Task List", taskList},
3✔
259
                        {"Args", truncate(input)}, // in case of large input
3✔
260
                }
3✔
261
                table.SetBorder(false)
3✔
262
                table.SetColumnSeparator(":")
3✔
263
                table.AppendBulk(executionData) // Add Bulk Data
3✔
264
                table.Render()
3✔
265

3✔
266
                printWorkflowProgress(c, domain, wid, resp.GetRunID())
3✔
267
        }
268

269
        if shouldPrintProgress {
9✔
270
                runFn()
3✔
271
        } else {
6✔
272
                startFn()
3✔
273
        }
3✔
274
}
275

276
func constructStartWorkflowRequest(c *cli.Context) *types.StartWorkflowExecutionRequest {
6✔
277
        domain := getRequiredGlobalOption(c, FlagDomain)
6✔
278
        taskList := getRequiredOption(c, FlagTaskList)
6✔
279
        workflowType := getRequiredOption(c, FlagWorkflowType)
6✔
280
        et := c.Int(FlagExecutionTimeout)
6✔
281
        if et == 0 {
6✔
282
                ErrorAndExit(fmt.Sprintf("Option %s format is invalid.", FlagExecutionTimeout), nil)
×
283
        }
×
284
        dt := c.Int(FlagDecisionTimeout)
6✔
285
        wid := c.String(FlagWorkflowID)
6✔
286
        if len(wid) == 0 {
8✔
287
                wid = uuid.New()
2✔
288
        }
2✔
289
        reusePolicy := defaultWorkflowIDReusePolicy.Ptr()
6✔
290
        if c.IsSet(FlagWorkflowIDReusePolicy) {
6✔
291
                reusePolicy = getWorkflowIDReusePolicy(c.Int(FlagWorkflowIDReusePolicy))
×
292
        }
×
293

294
        input := processJSONInput(c)
6✔
295
        startRequest := &types.StartWorkflowExecutionRequest{
6✔
296
                RequestID:  uuid.New(),
6✔
297
                Domain:     domain,
6✔
298
                WorkflowID: wid,
6✔
299
                WorkflowType: &types.WorkflowType{
6✔
300
                        Name: workflowType,
6✔
301
                },
6✔
302
                TaskList: &types.TaskList{
6✔
303
                        Name: taskList,
6✔
304
                },
6✔
305
                Input:                               []byte(input),
6✔
306
                ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32(et)),
6✔
307
                TaskStartToCloseTimeoutSeconds:      common.Int32Ptr(int32(dt)),
6✔
308
                Identity:                            getCliIdentity(),
6✔
309
                WorkflowIDReusePolicy:               reusePolicy,
6✔
310
        }
6✔
311
        if c.IsSet(FlagCronSchedule) {
6✔
312
                startRequest.CronSchedule = c.String(FlagCronSchedule)
×
313
        }
×
314

315
        if c.IsSet(FlagRetryAttempts) || c.IsSet(FlagRetryExpiration) {
6✔
316
                startRequest.RetryPolicy = &types.RetryPolicy{
×
317
                        InitialIntervalInSeconds: int32(c.Int(FlagRetryInterval)),
×
318
                        BackoffCoefficient:       c.Float64(FlagRetryBackoff),
×
319
                }
×
320

×
321
                if c.IsSet(FlagRetryAttempts) {
×
322
                        startRequest.RetryPolicy.MaximumAttempts = int32(c.Int(FlagRetryAttempts))
×
323
                }
×
324
                if c.IsSet(FlagRetryExpiration) {
×
325
                        startRequest.RetryPolicy.ExpirationIntervalInSeconds = int32(c.Int(FlagRetryExpiration))
×
326
                }
×
327
                if c.IsSet(FlagRetryMaxInterval) {
×
328
                        startRequest.RetryPolicy.MaximumIntervalInSeconds = int32(c.Int(FlagRetryMaxInterval))
×
329
                }
×
330
        }
331

332
        if c.IsSet(DelayStartSeconds) {
6✔
333
                startRequest.DelayStartSeconds = common.Int32Ptr(int32(c.Int(DelayStartSeconds)))
×
334
        }
×
335

336
        if c.IsSet(JitterStartSeconds) {
6✔
337
                startRequest.JitterStartSeconds = common.Int32Ptr(int32(c.Int(JitterStartSeconds)))
×
338
        }
×
339

340
        headerFields := processHeader(c)
6✔
341
        if len(headerFields) != 0 {
6✔
342
                startRequest.Header = &types.Header{Fields: headerFields}
×
343
        }
×
344

345
        memoFields := processMemo(c)
6✔
346
        if len(memoFields) != 0 {
6✔
347
                startRequest.Memo = &types.Memo{Fields: memoFields}
×
348
        }
×
349

350
        searchAttrFields := processSearchAttr(c)
6✔
351
        if len(searchAttrFields) != 0 {
6✔
352
                startRequest.SearchAttributes = &types.SearchAttributes{IndexedFields: searchAttrFields}
×
353
        }
×
354

355
        return startRequest
6✔
356
}
357

358
func processSearchAttr(c *cli.Context) map[string][]byte {
6✔
359
        rawSearchAttrKey := c.String(FlagSearchAttributesKey)
6✔
360
        var searchAttrKeys []string
6✔
361
        if strings.TrimSpace(rawSearchAttrKey) != "" {
6✔
362
                searchAttrKeys = trimSpace(strings.Split(rawSearchAttrKey, searchAttrInputSeparator))
×
363
        }
×
364

365
        rawSearchAttrVal := c.String(FlagSearchAttributesVal)
6✔
366
        var searchAttrVals []interface{}
6✔
367
        if strings.TrimSpace(rawSearchAttrVal) != "" {
6✔
368
                searchAttrValsStr := trimSpace(strings.Split(rawSearchAttrVal, searchAttrInputSeparator))
×
369

×
370
                for _, v := range searchAttrValsStr {
×
371
                        searchAttrVals = append(searchAttrVals, convertStringToRealType(v))
×
372
                }
×
373
        }
374

375
        if len(searchAttrKeys) != len(searchAttrVals) {
6✔
376
                ErrorAndExit("Number of search attributes keys and values are not equal.", nil)
×
377
        }
×
378

379
        fields := map[string][]byte{}
6✔
380
        for i, key := range searchAttrKeys {
6✔
381
                val, err := json.Marshal(searchAttrVals[i])
×
382
                if err != nil {
×
383
                        ErrorAndExit(fmt.Sprintf("Encode value %v error", val), err)
×
384
                }
×
385
                fields[key] = val
×
386
        }
387

388
        return fields
6✔
389
}
390

391
func processHeader(c *cli.Context) map[string][]byte {
6✔
392
        headerKeys := processMultipleKeys(c.String(FlagHeaderKey), " ")
6✔
393
        headerValues := processMultipleJSONValues(processJSONInputHelper(c, jsonTypeHeader))
6✔
394

6✔
395
        if len(headerKeys) != len(headerValues) {
6✔
396
                ErrorAndExit("Number of header keys and values are not equal.", nil)
×
397
        }
×
398

399
        return mapFromKeysValues(headerKeys, headerValues)
6✔
400
}
401

402
func processMemo(c *cli.Context) map[string][]byte {
6✔
403
        memoKeys := processMultipleKeys(c.String(FlagMemoKey), " ")
6✔
404
        memoValues := processMultipleJSONValues(processJSONInputHelper(c, jsonTypeMemo))
6✔
405

6✔
406
        if len(memoKeys) != len(memoValues) {
6✔
407
                ErrorAndExit("Number of memo keys and values are not equal.", nil)
×
408
        }
×
409

410
        return mapFromKeysValues(memoKeys, memoValues)
6✔
411
}
412

413
// helper function to print workflow progress with time refresh every second
414
func printWorkflowProgress(c *cli.Context, domain, wid, rid string) {
7✔
415
        fmt.Println(colorMagenta("Progress:"))
7✔
416

7✔
417
        wfClient := getWorkflowClient(c)
7✔
418
        timeElapse := 1
7✔
419
        isTimeElapseExist := false
7✔
420
        doneChan := make(chan bool)
7✔
421
        var lastEvent *types.HistoryEvent // used for print result of this run
7✔
422
        ticker := time.NewTicker(time.Second).C
7✔
423

7✔
424
        tcCtx, cancel := newIndefiniteContext(c)
7✔
425
        defer cancel()
7✔
426

7✔
427
        showDetails := c.Bool(FlagShowDetail)
7✔
428
        var maxFieldLength int
7✔
429
        if c.IsSet(FlagMaxFieldLength) {
7✔
430
                maxFieldLength = c.Int(FlagMaxFieldLength)
×
431
        }
×
432

433
        go func() {
14✔
434
                iterator, err := GetWorkflowHistoryIterator(tcCtx, wfClient, domain, wid, rid, true, types.HistoryEventFilterTypeAllEvent.Ptr())
7✔
435
                if err != nil {
7✔
436
                        ErrorAndExit("Unable to get history events.", err)
×
437
                }
×
438
                for iterator.HasNext() {
14✔
439
                        entity, err := iterator.Next()
7✔
440
                        event := entity.(*types.HistoryEvent)
7✔
441
                        if err != nil {
7✔
442
                                ErrorAndExit("Unable to read event.", err)
×
443
                        }
×
444
                        if isTimeElapseExist {
7✔
445
                                removePrevious2LinesFromTerminal()
×
446
                                isTimeElapseExist = false
×
447
                        }
×
448
                        if showDetails {
9✔
449
                                fmt.Printf("  %d, %s, %s, %s\n", event.ID, convertTime(event.GetTimestamp(), false), ColorEvent(event), HistoryEventToString(event, true, maxFieldLength))
2✔
450
                        } else {
7✔
451
                                fmt.Printf("  %d, %s, %s\n", event.ID, convertTime(event.GetTimestamp(), false), ColorEvent(event))
5✔
452
                        }
5✔
453
                        lastEvent = event
7✔
454
                }
455
                doneChan <- true
7✔
456
        }()
457

458
        for {
14✔
459
                select {
7✔
460
                case <-ticker:
×
461
                        if isTimeElapseExist {
×
462
                                removePrevious2LinesFromTerminal()
×
463
                        }
×
464
                        fmt.Printf("\nTime elapse: %ds\n", timeElapse)
×
465
                        isTimeElapseExist = true
×
466
                        timeElapse++
×
467
                case <-doneChan: // print result of this run
7✔
468
                        fmt.Println(colorMagenta("\nResult:"))
7✔
469
                        fmt.Printf("  Run Time: %d seconds\n", timeElapse)
7✔
470
                        printRunStatus(lastEvent)
7✔
471
                        return
7✔
472
                }
473
        }
474
}
475

476
// TerminateWorkflow terminates a workflow execution
477
func TerminateWorkflow(c *cli.Context) {
2✔
478
        wfClient := getWorkflowClient(c)
2✔
479

2✔
480
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
481
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
482
        rid := c.String(FlagRunID)
2✔
483
        reason := c.String(FlagReason)
2✔
484

2✔
485
        ctx, cancel := newContext(c)
2✔
486
        defer cancel()
2✔
487
        err := wfClient.TerminateWorkflowExecution(
2✔
488
                ctx,
2✔
489
                &types.TerminateWorkflowExecutionRequest{
2✔
490
                        Domain: domain,
2✔
491
                        Reason: reason,
2✔
492
                        WorkflowExecution: &types.WorkflowExecution{
2✔
493
                                WorkflowID: wid,
2✔
494
                                RunID:      rid,
2✔
495
                        }, Identity: getCliIdentity(),
2✔
496
                },
2✔
497
        )
2✔
498

2✔
499
        if err != nil {
3✔
500
                ErrorAndExit("Terminate workflow failed.", err)
1✔
501
        } else {
2✔
502
                fmt.Println("Terminate workflow succeeded.")
1✔
503
        }
1✔
504
}
505

506
// CancelWorkflow cancels a workflow execution
507
func CancelWorkflow(c *cli.Context) {
2✔
508
        wfClient := getWorkflowClient(c)
2✔
509

2✔
510
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
511
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
512
        rid := c.String(FlagRunID)
2✔
513
        reason := c.String(FlagReason)
2✔
514

2✔
515
        ctx, cancel := newContext(c)
2✔
516
        defer cancel()
2✔
517

2✔
518
        err := wfClient.RequestCancelWorkflowExecution(
2✔
519
                ctx,
2✔
520
                &types.RequestCancelWorkflowExecutionRequest{
2✔
521
                        Domain: domain,
2✔
522
                        WorkflowExecution: &types.WorkflowExecution{
2✔
523
                                WorkflowID: wid,
2✔
524
                                RunID:      rid,
2✔
525
                        },
2✔
526
                        Identity: getCliIdentity(),
2✔
527
                        Cause:    reason,
2✔
528
                },
2✔
529
        )
2✔
530
        if err != nil {
3✔
531
                ErrorAndExit("Cancel workflow failed.", err)
1✔
532
        } else {
2✔
533
                fmt.Println("Cancel workflow succeeded.")
1✔
534
        }
1✔
535
}
536

537
// SignalWorkflow signals a workflow execution
538
func SignalWorkflow(c *cli.Context) {
2✔
539
        serviceClient := cFactory.ServerFrontendClient(c)
2✔
540

2✔
541
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
542
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
543
        rid := c.String(FlagRunID)
2✔
544
        name := getRequiredOption(c, FlagName)
2✔
545
        input := processJSONInput(c)
2✔
546

2✔
547
        tcCtx, cancel := newContext(c)
2✔
548
        defer cancel()
2✔
549
        err := serviceClient.SignalWorkflowExecution(
2✔
550
                tcCtx,
2✔
551
                &types.SignalWorkflowExecutionRequest{
2✔
552
                        Domain: domain,
2✔
553
                        WorkflowExecution: &types.WorkflowExecution{
2✔
554
                                WorkflowID: wid,
2✔
555
                                RunID:      rid,
2✔
556
                        },
2✔
557
                        SignalName: name,
2✔
558
                        Input:      []byte(input),
2✔
559
                        Identity:   getCliIdentity(),
2✔
560
                        RequestID:  uuid.New(),
2✔
561
                },
2✔
562
        )
2✔
563

2✔
564
        if err != nil {
3✔
565
                ErrorAndExit("Signal workflow failed.", err)
1✔
566
        } else {
2✔
567
                fmt.Println("Signal workflow succeeded.")
1✔
568
        }
1✔
569
}
570

571
// SignalWithStartWorkflowExecution starts a workflow execution if not already exists and signals it
572
func SignalWithStartWorkflowExecution(c *cli.Context) {
×
573
        serviceClient := cFactory.ServerFrontendClient(c)
×
574

×
575
        signalWithStartRequest := constructSignalWithStartWorkflowRequest(c)
×
576

×
577
        tcCtx, cancel := newContext(c)
×
578
        defer cancel()
×
579

×
580
        resp, err := serviceClient.SignalWithStartWorkflowExecution(tcCtx, signalWithStartRequest)
×
581
        if err != nil {
×
582
                ErrorAndExit("SignalWithStart workflow failed.", err)
×
583
        } else {
×
584
                fmt.Printf("SignalWithStart workflow succeeded. Workflow Id: %s, run Id: %s\n", signalWithStartRequest.GetWorkflowID(), resp.GetRunID())
×
585
        }
×
586
}
587

588
func constructSignalWithStartWorkflowRequest(c *cli.Context) *types.SignalWithStartWorkflowExecutionRequest {
×
589
        startRequest := constructStartWorkflowRequest(c)
×
590

×
591
        return &types.SignalWithStartWorkflowExecutionRequest{
×
592
                Domain:                              startRequest.Domain,
×
593
                WorkflowID:                          startRequest.WorkflowID,
×
594
                WorkflowType:                        startRequest.WorkflowType,
×
595
                TaskList:                            startRequest.TaskList,
×
596
                Input:                               startRequest.Input,
×
597
                ExecutionStartToCloseTimeoutSeconds: startRequest.ExecutionStartToCloseTimeoutSeconds,
×
598
                TaskStartToCloseTimeoutSeconds:      startRequest.TaskStartToCloseTimeoutSeconds,
×
599
                Identity:                            startRequest.Identity,
×
600
                RequestID:                           startRequest.RequestID,
×
601
                WorkflowIDReusePolicy:               startRequest.WorkflowIDReusePolicy,
×
602
                RetryPolicy:                         startRequest.RetryPolicy,
×
603
                CronSchedule:                        startRequest.CronSchedule,
×
604
                Memo:                                startRequest.Memo,
×
605
                SearchAttributes:                    startRequest.SearchAttributes,
×
606
                Header:                              startRequest.Header,
×
607
                SignalName:                          getRequiredOption(c, FlagName),
×
608
                SignalInput:                         []byte(processJSONInputSignal(c)),
×
609
                DelayStartSeconds:                   startRequest.DelayStartSeconds,
×
610
                JitterStartSeconds:                  startRequest.JitterStartSeconds,
×
611
        }
×
612
}
×
613

614
func processJSONInputSignal(c *cli.Context) string {
×
615
        return processJSONInputHelper(c, jsonTypeSignal)
×
616
}
×
617

618
// QueryWorkflow query workflow execution
619
func QueryWorkflow(c *cli.Context) {
2✔
620
        getRequiredGlobalOption(c, FlagDomain) // for pre-check and alert if not provided
2✔
621
        getRequiredOption(c, FlagWorkflowID)
2✔
622
        queryType := getRequiredOption(c, FlagQueryType)
2✔
623

2✔
624
        queryWorkflowHelper(c, queryType)
2✔
625
}
2✔
626

627
// QueryWorkflowUsingStackTrace query workflow execution using __stack_trace as query type
628
func QueryWorkflowUsingStackTrace(c *cli.Context) {
1✔
629
        queryWorkflowHelper(c, "__stack_trace")
1✔
630
}
1✔
631

632
func queryWorkflowHelper(c *cli.Context, queryType string) {
3✔
633
        serviceClient := cFactory.ServerFrontendClient(c)
3✔
634

3✔
635
        domain := getRequiredGlobalOption(c, FlagDomain)
3✔
636
        wid := getRequiredOption(c, FlagWorkflowID)
3✔
637
        rid := c.String(FlagRunID)
3✔
638
        input := processJSONInput(c)
3✔
639

3✔
640
        tcCtx, cancel := newContext(c)
3✔
641
        defer cancel()
3✔
642
        queryRequest := &types.QueryWorkflowRequest{
3✔
643
                Domain: domain,
3✔
644
                Execution: &types.WorkflowExecution{
3✔
645
                        WorkflowID: wid,
3✔
646
                        RunID:      rid,
3✔
647
                },
3✔
648
                Query: &types.WorkflowQuery{
3✔
649
                        QueryType: queryType,
3✔
650
                },
3✔
651
        }
3✔
652
        if input != "" {
3✔
653
                queryRequest.Query.QueryArgs = []byte(input)
×
654
        }
×
655
        if c.IsSet(FlagQueryRejectCondition) {
3✔
656
                var rejectCondition types.QueryRejectCondition
×
657
                switch c.String(FlagQueryRejectCondition) {
×
658
                case "not_open":
×
659
                        rejectCondition = types.QueryRejectConditionNotOpen
×
660
                case "not_completed_cleanly":
×
661
                        rejectCondition = types.QueryRejectConditionNotCompletedCleanly
×
662
                default:
×
663
                        ErrorAndExit(fmt.Sprintf("invalid reject condition %v, valid values are \"not_open\" and \"not_completed_cleanly\"", c.String(FlagQueryRejectCondition)), nil)
×
664
                }
665
                queryRequest.QueryRejectCondition = &rejectCondition
×
666
        }
667
        if c.IsSet(FlagQueryConsistencyLevel) {
3✔
668
                var consistencyLevel types.QueryConsistencyLevel
×
669
                switch c.String(FlagQueryConsistencyLevel) {
×
670
                case "eventual":
×
671
                        consistencyLevel = types.QueryConsistencyLevelEventual
×
672
                case "strong":
×
673
                        consistencyLevel = types.QueryConsistencyLevelStrong
×
674
                default:
×
675
                        ErrorAndExit(fmt.Sprintf("invalid query consistency level %v, valid values are \"eventual\" and \"strong\"", c.String(FlagQueryConsistencyLevel)), nil)
×
676
                }
677
                queryRequest.QueryConsistencyLevel = &consistencyLevel
×
678
        }
679
        queryResponse, err := serviceClient.QueryWorkflow(tcCtx, queryRequest)
3✔
680
        if err != nil {
4✔
681
                ErrorAndExit("Query workflow failed.", err)
1✔
682
                return
1✔
683
        }
1✔
684

685
        if queryResponse.QueryRejected != nil {
2✔
686
                fmt.Printf("Query was rejected, workflow is in state: %v\n", *queryResponse.QueryRejected.CloseStatus)
×
687
        } else {
2✔
688
                // assume it is json encoded
2✔
689
                fmt.Print(string(queryResponse.QueryResult))
2✔
690
        }
2✔
691
}
692

693
// ListWorkflow list workflow executions based on filters
694
func ListWorkflow(c *cli.Context) {
8✔
695
        displayPagedWorkflows(c, listWorkflows(c), !c.Bool(FlagMore))
8✔
696
}
8✔
697

698
// ListAllWorkflow list all workflow executions based on filters
699
func ListAllWorkflow(c *cli.Context) {
×
700
        displayAllWorkflows(c, filterExcludedWorkflows(c, listWorkflows(c)))
×
701
}
×
702

703
// ScanAllWorkflow list all workflow executions using Scan API.
704
// It should be faster than ListAllWorkflow, but result are not sorted.
705
func ScanAllWorkflow(c *cli.Context) {
×
706
        displayAllWorkflows(c, scanWorkflows(c))
×
707
}
×
708

709
func isQueryOpen(query string) bool {
9✔
710
        var openWFPattern = regexp.MustCompile(`CloseTime[ ]*=[ ]*missing`)
9✔
711
        return openWFPattern.MatchString(query)
9✔
712
}
9✔
713

714
// CountWorkflow count number of workflows
715
func CountWorkflow(c *cli.Context) {
2✔
716
        wfClient := getWorkflowClient(c)
2✔
717

2✔
718
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
719
        query := c.String(FlagListQuery)
2✔
720
        request := &types.CountWorkflowExecutionsRequest{
2✔
721
                Domain: domain,
2✔
722
                Query:  query,
2✔
723
        }
2✔
724

2✔
725
        ctx, cancel := newContextForLongPoll(c)
2✔
726
        defer cancel()
2✔
727
        response, err := wfClient.CountWorkflowExecutions(ctx, request)
2✔
728
        if err != nil {
2✔
729
                ErrorAndExit("Failed to count workflow.", err)
×
730
        }
×
731

732
        fmt.Println(response.GetCount())
2✔
733
}
734

735
// ListArchivedWorkflow lists archived workflow executions based on filters
736
func ListArchivedWorkflow(c *cli.Context) {
1✔
737
        printAll := c.Bool(FlagAll)
1✔
738
        if printAll {
2✔
739
                displayAllWorkflows(c, listArchivedWorkflows(c))
1✔
740
        } else {
1✔
741
                displayPagedWorkflows(c, listArchivedWorkflows(c), false)
×
742
        }
×
743
}
744

745
// DescribeWorkflow show information about the specified workflow execution
746
func DescribeWorkflow(c *cli.Context) {
×
747
        wid := getRequiredOption(c, FlagWorkflowID)
×
748
        rid := c.String(FlagRunID)
×
749

×
750
        describeWorkflowHelper(c, wid, rid)
×
751
}
×
752

753
// DescribeWorkflowWithID show information about the specified workflow execution
754
func DescribeWorkflowWithID(c *cli.Context) {
×
755
        if !c.Args().Present() {
×
756
                ErrorAndExit("Argument workflow_id is required.", nil)
×
757
        }
×
758
        wid := c.Args().First()
×
759
        rid := ""
×
760
        if c.NArg() >= 2 {
×
761
                rid = c.Args().Get(1)
×
762
        }
×
763

764
        describeWorkflowHelper(c, wid, rid)
×
765
}
766

767
func describeWorkflowHelper(c *cli.Context, wid, rid string) {
×
768
        frontendClient := cFactory.ServerFrontendClient(c)
×
769
        domain := getRequiredGlobalOption(c, FlagDomain)
×
770
        printRaw := c.Bool(FlagPrintRaw) // printRaw is false by default,
×
771
        // and will show datetime and decoded search attributes instead of raw timestamp and byte arrays
×
772
        printResetPointsOnly := c.Bool(FlagResetPointsOnly)
×
773

×
774
        ctx, cancel := newContext(c)
×
775
        defer cancel()
×
776

×
777
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
778
                Domain: domain,
×
779
                Execution: &types.WorkflowExecution{
×
780
                        WorkflowID: wid,
×
781
                        RunID:      rid,
×
782
                },
×
783
        })
×
784
        if err != nil {
×
785
                ErrorAndExit("Describe workflow execution failed", err)
×
786
        }
×
787

788
        if printResetPointsOnly {
×
789
                printAutoResetPoints(resp)
×
790
                return
×
791
        }
×
792

793
        var o interface{}
×
794
        if printRaw {
×
795
                o = resp
×
796
        } else {
×
797
                o = convertDescribeWorkflowExecutionResponse(resp, frontendClient, c)
×
798
        }
×
799

800
        prettyPrintJSONObject(o)
×
801
}
802

803
type AutoResetPointRow struct {
804
        BinaryChecksum string    `header:"Binary Checksum"`
805
        CreateTime     time.Time `header:"Create Time"`
806
        RunID          string    `header:"RunID"`
807
        EventID        int64     `header:"EventID"`
808
}
809

810
func printAutoResetPoints(resp *types.DescribeWorkflowExecutionResponse) {
×
811
        fmt.Println("Auto Reset Points:")
×
812
        table := []AutoResetPointRow{}
×
813
        if resp.WorkflowExecutionInfo.AutoResetPoints == nil || len(resp.WorkflowExecutionInfo.AutoResetPoints.Points) == 0 {
×
814
                return
×
815
        }
×
816
        for _, pt := range resp.WorkflowExecutionInfo.AutoResetPoints.Points {
×
817
                table = append(table, AutoResetPointRow{
×
818
                        BinaryChecksum: pt.GetBinaryChecksum(),
×
819
                        CreateTime:     time.Unix(0, pt.GetCreatedTimeNano()),
×
820
                        RunID:          pt.GetRunID(),
×
821
                        EventID:        pt.GetFirstDecisionCompletedID(),
×
822
                })
×
823
        }
×
824
        RenderTable(os.Stdout, table, RenderOptions{Color: true, Border: true, PrintDateTime: true})
×
825
}
826

827
// describeWorkflowExecutionResponse is used to print datetime instead of print raw time
828
type describeWorkflowExecutionResponse struct {
829
        ExecutionConfiguration *types.WorkflowExecutionConfiguration
830
        WorkflowExecutionInfo  workflowExecutionInfo
831
        PendingActivities      []*pendingActivityInfo
832
        PendingChildren        []*types.PendingChildExecutionInfo
833
        PendingDecision        *pendingDecisionInfo
834
}
835

836
// workflowExecutionInfo has same fields as types.WorkflowExecutionInfo, but has datetime instead of raw time
837
type workflowExecutionInfo struct {
838
        Execution        *types.WorkflowExecution
839
        Type             *types.WorkflowType
840
        StartTime        *string // change from *int64
841
        CloseTime        *string // change from *int64
842
        CloseStatus      *types.WorkflowExecutionCloseStatus
843
        HistoryLength    int64
844
        ParentDomainID   *string
845
        ParentExecution  *types.WorkflowExecution
846
        Memo             *types.Memo
847
        SearchAttributes map[string]interface{}
848
        AutoResetPoints  *types.ResetPoints
849
        PartitionConfig  map[string]string
850
}
851

852
// pendingActivityInfo has same fields as types.PendingActivityInfo, but different field type for better display
853
type pendingActivityInfo struct {
854
        ActivityID             string
855
        ActivityType           *types.ActivityType
856
        State                  *types.PendingActivityState
857
        ScheduledTimestamp     *string `json:",omitempty"` // change from *int64
858
        LastStartedTimestamp   *string `json:",omitempty"` // change from *int64
859
        HeartbeatDetails       *string `json:",omitempty"` // change from []byte
860
        LastHeartbeatTimestamp *string `json:",omitempty"` // change from *int64
861
        Attempt                int32   `json:",omitempty"`
862
        MaximumAttempts        int32   `json:",omitempty"`
863
        ExpirationTimestamp    *string `json:",omitempty"` // change from *int64
864
        LastFailureReason      *string `json:",omitempty"`
865
        LastWorkerIdentity     string  `json:",omitempty"`
866
        LastFailureDetails     *string `json:",omitempty"` // change from []byte
867
}
868

869
type pendingDecisionInfo struct {
870
        State                      *types.PendingDecisionState
871
        OriginalScheduledTimestamp *string `json:",omitempty"` // change from *int64
872
        ScheduledTimestamp         *string `json:",omitempty"` // change from *int64
873
        StartedTimestamp           *string `json:",omitempty"` // change from *int64
874
        Attempt                    int64   `json:",omitempty"`
875
}
876

877
func convertDescribeWorkflowExecutionResponse(resp *types.DescribeWorkflowExecutionResponse,
878
        wfClient frontend.Client, c *cli.Context) *describeWorkflowExecutionResponse {
4✔
879

4✔
880
        info := resp.WorkflowExecutionInfo
4✔
881
        executionInfo := workflowExecutionInfo{
4✔
882
                Execution:        info.Execution,
4✔
883
                Type:             info.Type,
4✔
884
                StartTime:        common.StringPtr(convertTime(info.GetStartTime(), false)),
4✔
885
                CloseTime:        common.StringPtr(convertTime(info.GetCloseTime(), false)),
4✔
886
                CloseStatus:      info.CloseStatus,
4✔
887
                HistoryLength:    info.HistoryLength,
4✔
888
                ParentDomainID:   info.ParentDomainID,
4✔
889
                ParentExecution:  info.ParentExecution,
4✔
890
                Memo:             info.Memo,
4✔
891
                SearchAttributes: convertSearchAttributesToMapOfInterface(info.SearchAttributes, wfClient, c),
4✔
892
                AutoResetPoints:  info.AutoResetPoints,
4✔
893
                PartitionConfig:  info.PartitionConfig,
4✔
894
        }
4✔
895

4✔
896
        var pendingActs []*pendingActivityInfo
4✔
897
        var tmpAct *pendingActivityInfo
4✔
898
        for _, pa := range resp.PendingActivities {
4✔
899
                tmpAct = &pendingActivityInfo{
×
900
                        ActivityID:             pa.ActivityID,
×
901
                        ActivityType:           pa.ActivityType,
×
902
                        State:                  pa.State,
×
903
                        ScheduledTimestamp:     timestampPtrToStringPtr(pa.ScheduledTimestamp, false),
×
904
                        LastStartedTimestamp:   timestampPtrToStringPtr(pa.LastStartedTimestamp, false),
×
905
                        LastHeartbeatTimestamp: timestampPtrToStringPtr(pa.LastHeartbeatTimestamp, false),
×
906
                        Attempt:                pa.Attempt,
×
907
                        MaximumAttempts:        pa.MaximumAttempts,
×
908
                        ExpirationTimestamp:    timestampPtrToStringPtr(pa.ExpirationTimestamp, false),
×
909
                        LastFailureReason:      pa.LastFailureReason,
×
910
                        LastWorkerIdentity:     pa.LastWorkerIdentity,
×
911
                }
×
912
                if pa.HeartbeatDetails != nil {
×
913
                        tmpAct.HeartbeatDetails = common.StringPtr(string(pa.HeartbeatDetails))
×
914
                }
×
915
                if pa.LastFailureDetails != nil {
×
916
                        tmpAct.LastFailureDetails = common.StringPtr(string(pa.LastFailureDetails))
×
917
                }
×
918
                pendingActs = append(pendingActs, tmpAct)
×
919
        }
920

921
        var pendingDecision *pendingDecisionInfo
4✔
922
        if resp.PendingDecision != nil {
4✔
923
                pendingDecision = &pendingDecisionInfo{
×
924
                        State:              resp.PendingDecision.State,
×
925
                        ScheduledTimestamp: timestampPtrToStringPtr(resp.PendingDecision.ScheduledTimestamp, false),
×
926
                        StartedTimestamp:   timestampPtrToStringPtr(resp.PendingDecision.StartedTimestamp, false),
×
927
                        Attempt:            resp.PendingDecision.Attempt,
×
928
                }
×
929
                // TODO: Idea here is only display decision task original scheduled timestamp if user are
×
930
                // using decision heartbeat. And we should be able to tell whether a decision task has heartbeat
×
931
                // or not by comparing the original scheduled timestamp and scheduled timestamp.
×
932
                // However, currently server may assign different value to original scheduled timestamp and
×
933
                // scheduled time even if there's no decision heartbeat.
×
934
                // if resp.PendingDecision.OriginalScheduledTimestamp != nil &&
×
935
                //         resp.PendingDecision.ScheduledTimestamp != nil &&
×
936
                //         *resp.PendingDecision.OriginalScheduledTimestamp != *resp.PendingDecision.ScheduledTimestamp {
×
937
                //         pendingDecision.OriginalScheduledTimestamp = timestampPtrToStringPtr(resp.PendingDecision.OriginalScheduledTimestamp, false)
×
938
                // }
×
939
        }
×
940

941
        return &describeWorkflowExecutionResponse{
4✔
942
                ExecutionConfiguration: resp.ExecutionConfiguration,
4✔
943
                WorkflowExecutionInfo:  executionInfo,
4✔
944
                PendingActivities:      pendingActs,
4✔
945
                PendingChildren:        resp.PendingChildren,
4✔
946
                PendingDecision:        pendingDecision,
4✔
947
        }
4✔
948
}
949

950
func convertSearchAttributesToMapOfInterface(searchAttributes *types.SearchAttributes,
951
        wfClient frontend.Client, c *cli.Context) map[string]interface{} {
4✔
952

4✔
953
        if searchAttributes == nil || len(searchAttributes.GetIndexedFields()) == 0 {
8✔
954
                return nil
4✔
955
        }
4✔
956

957
        result := make(map[string]interface{})
×
958
        ctx, cancel := newContext(c)
×
959
        defer cancel()
×
960
        validSearchAttributes, err := wfClient.GetSearchAttributes(ctx)
×
961
        if err != nil {
×
962
                ErrorAndExit("Error when get search attributes", err)
×
963
        }
×
964
        validKeys := validSearchAttributes.GetKeys()
×
965

×
966
        indexedFields := searchAttributes.GetIndexedFields()
×
967
        for k, v := range indexedFields {
×
968
                valueType := validKeys[k]
×
969
                deserializedValue, err := common.DeserializeSearchAttributeValue(v, valueType)
×
970
                if err != nil {
×
971
                        ErrorAndExit("Error deserializing search attribute value", err)
×
972
                }
×
973
                result[k] = deserializedValue
×
974
        }
975

976
        return result
×
977
}
978

979
func getAllWorkflowIDsByQuery(c *cli.Context, query string) map[string]bool {
×
980
        wfClient := getWorkflowClient(c)
×
981
        pageSize := 1000
×
982
        var nextPageToken []byte
×
983
        var info []*types.WorkflowExecutionInfo
×
984
        result := map[string]bool{}
×
985
        for {
×
986
                info, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
×
987
                for _, we := range info {
×
988
                        wid := we.Execution.GetWorkflowID()
×
989
                        result[wid] = true
×
990
                }
×
991

992
                if nextPageToken == nil {
×
993
                        break
×
994
                }
995
        }
996
        return result
×
997
}
998

999
func printRunStatus(event *types.HistoryEvent) {
7✔
1000
        switch event.GetEventType() {
7✔
1001
        case types.EventTypeWorkflowExecutionCompleted:
×
1002
                fmt.Printf("  Status: %s\n", colorGreen("COMPLETED"))
×
1003
                fmt.Printf("  Output: %s\n", string(event.WorkflowExecutionCompletedEventAttributes.Result))
×
1004
        case types.EventTypeWorkflowExecutionFailed:
×
1005
                fmt.Printf("  Status: %s\n", colorRed("FAILED"))
×
1006
                fmt.Printf("  Reason: %s\n", event.WorkflowExecutionFailedEventAttributes.GetReason())
×
1007
                fmt.Printf("  Detail: %s\n", string(event.WorkflowExecutionFailedEventAttributes.Details))
×
1008
        case types.EventTypeWorkflowExecutionTimedOut:
×
1009
                fmt.Printf("  Status: %s\n", colorRed("TIMEOUT"))
×
1010
                fmt.Printf("  Timeout Type: %s\n", event.WorkflowExecutionTimedOutEventAttributes.GetTimeoutType())
×
1011
        case types.EventTypeWorkflowExecutionCanceled:
×
1012
                fmt.Printf("  Status: %s\n", colorRed("CANCELED"))
×
1013
                fmt.Printf("  Detail: %s\n", string(event.WorkflowExecutionCanceledEventAttributes.Details))
×
1014
        }
1015
}
1016

1017
// WorkflowRow is a presentation layer entity use to render a table of workflows
1018
type WorkflowRow struct {
1019
        WorkflowType     string                 `header:"Workflow Type" maxLength:"32"`
1020
        WorkflowID       string                 `header:"Workflow ID"`
1021
        RunID            string                 `header:"Run ID"`
1022
        TaskList         string                 `header:"Task List"`
1023
        IsCron           bool                   `header:"Is Cron"`
1024
        StartTime        time.Time              `header:"Start Time"`
1025
        ExecutionTime    time.Time              `header:"Execution Time"`
1026
        EndTime          time.Time              `header:"End Time"`
1027
        Memo             map[string]string      `header:"Memo"`
1028
        SearchAttributes map[string]interface{} `header:"Search Attributes"`
1029
}
1030

1031
func newWorkflowRow(workflow *types.WorkflowExecutionInfo) WorkflowRow {
4✔
1032
        memo := map[string]string{}
4✔
1033
        for k, v := range workflow.Memo.GetFields() {
4✔
1034
                memo[k] = string(v)
×
1035
        }
×
1036

1037
        sa := map[string]interface{}{}
4✔
1038
        for k, v := range workflow.SearchAttributes.GetIndexedFields() {
4✔
1039
                var decodedVal interface{}
×
1040
                json.Unmarshal(v, &decodedVal)
×
1041
                sa[k] = decodedVal
×
1042
        }
×
1043

1044
        return WorkflowRow{
4✔
1045
                WorkflowType:     workflow.Type.GetName(),
4✔
1046
                WorkflowID:       workflow.Execution.GetWorkflowID(),
4✔
1047
                RunID:            workflow.Execution.GetRunID(),
4✔
1048
                TaskList:         workflow.TaskList,
4✔
1049
                IsCron:           workflow.IsCron,
4✔
1050
                StartTime:        time.Unix(0, workflow.GetStartTime()),
4✔
1051
                ExecutionTime:    time.Unix(0, workflow.GetExecutionTime()),
4✔
1052
                EndTime:          time.Unix(0, workflow.GetCloseTime()),
4✔
1053
                Memo:             memo,
4✔
1054
                SearchAttributes: sa,
4✔
1055
        }
4✔
1056
}
1057

1058
func workflowTableOptions(c *cli.Context) RenderOptions {
9✔
1059
        isScanQueryOpen := isQueryOpen(c.String(FlagListQuery))
9✔
1060

9✔
1061
        return RenderOptions{
9✔
1062
                DefaultTemplate: templateTable,
9✔
1063
                Color:           true,
9✔
1064
                PrintDateTime:   c.Bool(FlagPrintDateTime),
9✔
1065
                PrintRawTime:    c.Bool(FlagPrintRawTime),
9✔
1066
                OptionalColumns: map[string]bool{
9✔
1067
                        "End Time":          !(c.Bool(FlagOpen) || isScanQueryOpen),
9✔
1068
                        "Memo":              c.Bool(FlagPrintMemo),
9✔
1069
                        "Search Attributes": c.Bool(FlagPrintSearchAttr),
9✔
1070
                },
9✔
1071
        }
9✔
1072
}
9✔
1073

1074
type getWorkflowPageFn func([]byte) ([]*types.WorkflowExecutionInfo, []byte)
1075

1076
func getAllWorkflows(getWorkflowPage getWorkflowPageFn) []*types.WorkflowExecutionInfo {
1✔
1077
        var all, page []*types.WorkflowExecutionInfo
1✔
1078
        var nextPageToken []byte
1✔
1079
        for {
2✔
1080
                page, nextPageToken = getWorkflowPage(nextPageToken)
1✔
1081
                all = append(all, page...)
1✔
1082
                if len(nextPageToken) == 0 {
2✔
1083
                        break
1✔
1084
                }
1085
        }
1086
        return all
1✔
1087
}
1088

1089
func filterExcludedWorkflows(c *cli.Context, getWorkflowPage getWorkflowPageFn) getWorkflowPageFn {
×
1090
        excludeWIDs := map[string]bool{}
×
1091
        if c.IsSet(FlagListQuery) && c.IsSet(FlagExcludeWorkflowIDByQuery) {
×
1092
                excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
×
1093
                excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
×
1094
                fmt.Printf("found %d workflowIDs to exclude\n", len(excludeWIDs))
×
1095
        }
×
1096

1097
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
×
1098
                page, nextPageToken := getWorkflowPage(nextPageToken)
×
1099
                filtered := make([]*types.WorkflowExecutionInfo, 0, len(page))
×
1100
                for _, workflow := range page {
×
1101
                        if excludeWIDs[workflow.GetExecution().GetWorkflowID()] {
×
1102
                                continue
×
1103
                        }
1104
                        filtered = append(filtered, workflow)
×
1105
                }
1106
                return filtered, nextPageToken
×
1107
        }
1108
}
1109

1110
func displayPagedWorkflows(c *cli.Context, getWorkflowPage getWorkflowPageFn, firstPageOnly bool) {
8✔
1111
        var page []*types.WorkflowExecutionInfo
8✔
1112
        var nextPageToken []byte
8✔
1113
        for {
16✔
1114
                page, nextPageToken = getWorkflowPage(nextPageToken)
8✔
1115

8✔
1116
                displayWorkflows(c, page)
8✔
1117

8✔
1118
                if firstPageOnly {
16✔
1119
                        break
8✔
1120
                }
1121
                if len(nextPageToken) == 0 {
×
1122
                        break
×
1123
                }
1124
                if !showNextPage() {
×
1125
                        break
×
1126
                }
1127
        }
1128
}
1129

1130
func displayAllWorkflows(c *cli.Context, getWorkflowsPage getWorkflowPageFn) {
1✔
1131
        displayWorkflows(c, getAllWorkflows(getWorkflowsPage))
1✔
1132
}
1✔
1133

1134
func displayWorkflows(c *cli.Context, workflows []*types.WorkflowExecutionInfo) {
9✔
1135
        printJSON := c.Bool(FlagPrintJSON)
9✔
1136
        printDecodedRaw := c.Bool(FlagPrintFullyDetail)
9✔
1137
        if printJSON || printDecodedRaw {
9✔
1138
                fmt.Println("[")
×
1139
                printListResults(workflows, printJSON, false)
×
1140
                fmt.Println("]")
×
1141
        } else {
9✔
1142
                tableOptions := workflowTableOptions(c)
9✔
1143
                var table []WorkflowRow
9✔
1144
                for _, workflow := range workflows {
13✔
1145
                        table = append(table, newWorkflowRow(workflow))
4✔
1146
                }
4✔
1147
                Render(c, table, tableOptions)
9✔
1148
        }
1149
}
1150

1151
func listWorkflowExecutions(client frontend.Client, pageSize int, domain, query string, c *cli.Context) getWorkflowPageFn {
×
1152
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
×
1153
                request := &types.ListWorkflowExecutionsRequest{
×
1154
                        Domain:        domain,
×
1155
                        PageSize:      int32(pageSize),
×
1156
                        NextPageToken: nextPageToken,
×
1157
                        Query:         query,
×
1158
                }
×
1159

×
1160
                ctx, cancel := newContextForLongPoll(c)
×
1161
                defer cancel()
×
1162
                response, err := client.ListWorkflowExecutions(ctx, request)
×
1163
                if err != nil {
×
1164
                        ErrorAndExit("Failed to list workflow.", err)
×
1165
                }
×
1166
                return response.Executions, response.NextPageToken
×
1167
        }
1168
}
1169

1170
func listOpenWorkflow(client frontend.Client, pageSize int, earliestTime, latestTime int64, domain, workflowID, workflowType string, c *cli.Context) getWorkflowPageFn {
7✔
1171
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
14✔
1172
                request := &types.ListOpenWorkflowExecutionsRequest{
7✔
1173
                        Domain:          domain,
7✔
1174
                        MaximumPageSize: int32(pageSize),
7✔
1175
                        NextPageToken:   nextPageToken,
7✔
1176
                        StartTimeFilter: &types.StartTimeFilter{
7✔
1177
                                EarliestTime: common.Int64Ptr(earliestTime),
7✔
1178
                                LatestTime:   common.Int64Ptr(latestTime),
7✔
1179
                        },
7✔
1180
                }
7✔
1181
                if len(workflowID) > 0 {
8✔
1182
                        request.ExecutionFilter = &types.WorkflowExecutionFilter{WorkflowID: workflowID}
1✔
1183
                }
1✔
1184
                if len(workflowType) > 0 {
8✔
1185
                        request.TypeFilter = &types.WorkflowTypeFilter{Name: workflowType}
1✔
1186
                }
1✔
1187

1188
                ctx, cancel := newContextForLongPoll(c)
7✔
1189
                defer cancel()
7✔
1190
                response, err := client.ListOpenWorkflowExecutions(ctx, request)
7✔
1191
                if err != nil {
7✔
1192
                        ErrorAndExit("Failed to list open workflow.", err)
×
1193
                }
×
1194
                return response.Executions, response.NextPageToken
7✔
1195
        }
1196
}
1197

1198
func listClosedWorkflow(client frontend.Client, pageSize int, earliestTime, latestTime int64, domain, workflowID, workflowType string, workflowStatus types.WorkflowExecutionCloseStatus, c *cli.Context) getWorkflowPageFn {
10✔
1199
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
20✔
1200
                request := &types.ListClosedWorkflowExecutionsRequest{
10✔
1201
                        Domain:          domain,
10✔
1202
                        MaximumPageSize: int32(pageSize),
10✔
1203
                        NextPageToken:   nextPageToken,
10✔
1204
                        StartTimeFilter: &types.StartTimeFilter{
10✔
1205
                                EarliestTime: common.Int64Ptr(earliestTime),
10✔
1206
                                LatestTime:   common.Int64Ptr(latestTime),
10✔
1207
                        },
10✔
1208
                }
10✔
1209
                if len(workflowID) > 0 {
11✔
1210
                        request.ExecutionFilter = &types.WorkflowExecutionFilter{WorkflowID: workflowID}
1✔
1211
                }
1✔
1212
                if len(workflowType) > 0 {
11✔
1213
                        request.TypeFilter = &types.WorkflowTypeFilter{Name: workflowType}
1✔
1214
                }
1✔
1215
                if workflowStatus != workflowStatusNotSet {
10✔
1216
                        request.StatusFilter = &workflowStatus
×
1217
                }
×
1218

1219
                ctx, cancel := newContextForLongPoll(c)
10✔
1220
                defer cancel()
10✔
1221
                response, err := client.ListClosedWorkflowExecutions(ctx, request)
10✔
1222
                if err != nil {
10✔
1223
                        ErrorAndExit("Failed to list closed workflow.", err)
×
1224
                }
×
1225
                return response.Executions, response.NextPageToken
10✔
1226
        }
1227
}
1228

1229
func listWorkflows(c *cli.Context) getWorkflowPageFn {
8✔
1230
        wfClient := getWorkflowClient(c)
8✔
1231

8✔
1232
        domain := getRequiredGlobalOption(c, FlagDomain)
8✔
1233
        earliestTime := parseTime(c.String(FlagEarliestTime), 0)
8✔
1234
        latestTime := parseTime(c.String(FlagLatestTime), time.Now().UnixNano())
8✔
1235
        workflowID := c.String(FlagWorkflowID)
8✔
1236
        workflowType := c.String(FlagWorkflowType)
8✔
1237
        queryOpen := c.Bool(FlagOpen)
8✔
1238
        pageSize := c.Int(FlagPageSize)
8✔
1239
        if pageSize <= 0 {
8✔
1240
                pageSize = defaultPageSizeForList
×
1241
        }
×
1242

1243
        var workflowStatus types.WorkflowExecutionCloseStatus
8✔
1244
        if c.IsSet(FlagWorkflowStatus) {
8✔
1245
                if queryOpen {
×
1246
                        ErrorAndExit(optionErr, errors.New("you can only filter on status for closed workflow, not open workflow"))
×
1247
                }
×
1248
                workflowStatus = getWorkflowStatus(c.String(FlagWorkflowStatus))
×
1249
        } else {
8✔
1250
                workflowStatus = workflowStatusNotSet
8✔
1251
        }
8✔
1252

1253
        if len(workflowID) > 0 && len(workflowType) > 0 {
8✔
1254
                ErrorAndExit(optionErr, errors.New("you can filter on workflow_id or workflow_type, but not on both"))
×
1255
        }
×
1256

1257
        ctx, cancel := newContextForLongPoll(c)
8✔
1258
        defer cancel()
8✔
1259
        resp, err := wfClient.CountWorkflowExecutions(
8✔
1260
                ctx,
8✔
1261
                &types.CountWorkflowExecutionsRequest{
8✔
1262
                        Domain: domain,
8✔
1263
                        Query:  c.String(FlagListQuery),
8✔
1264
                },
8✔
1265
        )
8✔
1266
        if err != nil {
8✔
1267
                printError("Unable to count workflows. Proceeding with fetching list of workflows...", err)
×
1268
        } else {
8✔
1269
                fmt.Printf("Fetching %v workflows...\n", resp.GetCount())
8✔
1270
        }
8✔
1271

1272
        if c.IsSet(FlagListQuery) {
8✔
1273
                listQuery := c.String(FlagListQuery)
×
1274
                return listWorkflowExecutions(wfClient, pageSize, domain, listQuery, c)
×
1275
        } else if queryOpen {
11✔
1276
                return listOpenWorkflow(wfClient, pageSize, earliestTime, latestTime, domain, workflowID, workflowType, c)
3✔
1277
        } else {
8✔
1278
                return listClosedWorkflow(wfClient, pageSize, earliestTime, latestTime, domain, workflowID, workflowType, workflowStatus, c)
5✔
1279
        }
5✔
1280
}
1281

1282
func listArchivedWorkflows(c *cli.Context) getWorkflowPageFn {
1✔
1283
        wfClient := getWorkflowClient(c)
1✔
1284

1✔
1285
        domain := getRequiredGlobalOption(c, FlagDomain)
1✔
1286
        pageSize := c.Int(FlagPageSize)
1✔
1287
        listQuery := getRequiredOption(c, FlagListQuery)
1✔
1288
        if pageSize <= 0 {
1✔
1289
                pageSize = defaultPageSizeForList
×
1290
        }
×
1291

1292
        contextTimeout := defaultContextTimeoutForListArchivedWorkflow
1✔
1293
        if c.GlobalIsSet(FlagContextTimeout) {
1✔
1294
                contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second
×
1295
        }
×
1296

1297
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
2✔
1298
                request := &types.ListArchivedWorkflowExecutionsRequest{
1✔
1299
                        Domain:        domain,
1✔
1300
                        PageSize:      int32(pageSize),
1✔
1301
                        Query:         listQuery,
1✔
1302
                        NextPageToken: nextPageToken,
1✔
1303
                }
1✔
1304

1✔
1305
                ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
1✔
1306
                defer cancel()
1✔
1307

1✔
1308
                result, err := wfClient.ListArchivedWorkflowExecutions(ctx, request)
1✔
1309
                if err != nil {
1✔
1310
                        cancel()
×
1311
                        ErrorAndExit("Failed to list archived workflow.", err)
×
1312
                }
×
1313
                return result.Executions, result.NextPageToken
1✔
1314
        }
1315
}
1316

1317
func scanWorkflows(c *cli.Context) getWorkflowPageFn {
×
1318
        wfClient := getWorkflowClient(c)
×
1319
        listQuery := c.String(FlagListQuery)
×
1320
        pageSize := c.Int(FlagPageSize)
×
1321
        if pageSize <= 0 {
×
1322
                pageSize = defaultPageSizeForScan
×
1323
        }
×
1324

1325
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
×
1326
                return scanWorkflowExecutions(wfClient, pageSize, nextPageToken, listQuery, c)
×
1327
        }
×
1328
}
1329

1330
func scanWorkflowExecutions(client frontend.Client, pageSize int, nextPageToken []byte, query string, c *cli.Context) ([]*types.WorkflowExecutionInfo, []byte) {
×
1331
        domain := getRequiredGlobalOption(c, FlagDomain)
×
1332

×
1333
        request := &types.ListWorkflowExecutionsRequest{
×
1334
                Domain:        domain,
×
1335
                PageSize:      int32(pageSize),
×
1336
                NextPageToken: nextPageToken,
×
1337
                Query:         query,
×
1338
        }
×
1339
        ctx, cancel := newContextForLongPoll(c)
×
1340
        defer cancel()
×
1341
        response, err := client.ScanWorkflowExecutions(ctx, request)
×
1342
        if err != nil {
×
1343
                ErrorAndExit("Failed to list workflow.", err)
×
1344
        }
×
1345
        return response.Executions, response.NextPageToken
×
1346
}
1347

1348
func getWorkflowStatus(statusStr string) types.WorkflowExecutionCloseStatus {
×
1349
        if status, ok := workflowClosedStatusMap[strings.ToLower(statusStr)]; ok {
×
1350
                return status
×
1351
        }
×
1352
        ErrorAndExit(optionErr, errors.New("option status is not one of allowed values "+
×
1353
                "[completed, failed, canceled, terminated, continued_as_new, timed_out]"))
×
1354
        return 0
×
1355
}
1356

1357
func getWorkflowIDReusePolicy(value int) *types.WorkflowIDReusePolicy {
3✔
1358
        if value >= 0 && types.WorkflowIDReusePolicy(value) <= types.WorkflowIDReusePolicyTerminateIfRunning {
4✔
1359
                return types.WorkflowIDReusePolicy(value).Ptr()
1✔
1360
        }
1✔
1361
        // At this point, the policy should return if the value is valid
1362
        ErrorAndExit(fmt.Sprintf("Option %v value is not in supported range.", FlagWorkflowIDReusePolicy), nil)
2✔
1363
        return nil
2✔
1364
}
1365

1366
// default will print decoded raw
1367
func printListResults(executions []*types.WorkflowExecutionInfo, inJSON bool, more bool) {
×
1368
        for i, execution := range executions {
×
1369
                if inJSON {
×
1370
                        j, _ := json.Marshal(execution)
×
1371
                        if more || i < len(executions)-1 {
×
1372
                                fmt.Println(string(j) + ",")
×
1373
                        } else {
×
1374
                                fmt.Println(string(j))
×
1375
                        }
×
1376
                } else {
×
1377
                        if more || i < len(executions)-1 {
×
1378
                                fmt.Println(anyToString(execution, true, 0) + ",")
×
1379
                        } else {
×
1380
                                fmt.Println(anyToString(execution, true, 0))
×
1381
                        }
×
1382
                }
1383
        }
1384
}
1385

1386
// ObserveHistory show the process of running workflow
1387
func ObserveHistory(c *cli.Context) {
2✔
1388
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
1389
        rid := c.String(FlagRunID)
2✔
1390
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
1391

2✔
1392
        printWorkflowProgress(c, domain, wid, rid)
2✔
1393
}
2✔
1394

1395
// ResetWorkflow reset workflow
1396
func ResetWorkflow(c *cli.Context) {
×
1397
        domain := getRequiredGlobalOption(c, FlagDomain)
×
1398
        wid := getRequiredOption(c, FlagWorkflowID)
×
1399
        reason := getRequiredOption(c, FlagReason)
×
1400
        if len(reason) == 0 {
×
1401
                ErrorAndExit("wrong reason", fmt.Errorf("reason cannot be empty"))
×
1402
        }
×
1403
        eventID := c.Int64(FlagEventID)
×
1404
        resetType := c.String(FlagResetType)
×
1405
        decisionOffset := c.Int(FlagDecisionOffset)
×
1406
        if decisionOffset > 0 {
×
1407
                ErrorAndExit("Only decision offset <=0 is supported", nil)
×
1408
        }
×
1409

1410
        extraForResetType, ok := resetTypesMap[resetType]
×
1411
        if !ok && eventID <= 0 {
×
1412
                ErrorAndExit("Must specify valid eventID or valid resetType", nil)
×
1413
        }
×
1414
        if ok && len(extraForResetType) > 0 {
×
1415
                getRequiredOption(c, extraForResetType)
×
1416
        }
×
1417

1418
        ctx, cancel := newContext(c)
×
1419
        defer cancel()
×
1420

×
1421
        frontendClient := cFactory.ServerFrontendClient(c)
×
1422
        rid := c.String(FlagRunID)
×
1423
        var err error
×
1424
        if rid == "" {
×
1425
                rid, err = getCurrentRunID(ctx, domain, wid, frontendClient)
×
1426
                if err != nil {
×
1427
                        ErrorAndExit("Cannot get latest RunID as default", err)
×
1428
                }
×
1429
        }
1430

1431
        resetBaseRunID := rid
×
1432
        decisionFinishID := eventID
×
1433
        if resetType != "" {
×
1434
                resetBaseRunID, decisionFinishID, err = getResetEventIDByType(ctx, c, resetType, decisionOffset, domain, wid, rid, frontendClient)
×
1435
                if err != nil {
×
1436
                        ErrorAndExit("getResetEventIDByType failed", err)
×
1437
                }
×
1438
        }
1439
        resp, err := frontendClient.ResetWorkflowExecution(ctx, &types.ResetWorkflowExecutionRequest{
×
1440
                Domain: domain,
×
1441
                WorkflowExecution: &types.WorkflowExecution{
×
1442
                        WorkflowID: wid,
×
1443
                        RunID:      resetBaseRunID,
×
1444
                },
×
1445
                Reason:                fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), reason),
×
1446
                DecisionFinishEventID: decisionFinishID,
×
1447
                RequestID:             uuid.New(),
×
1448
                SkipSignalReapply:     c.Bool(FlagSkipSignalReapply),
×
1449
        })
×
1450
        if err != nil {
×
1451
                ErrorAndExit("reset failed", err)
×
1452
        }
×
1453
        prettyPrintJSONObject(resp)
×
1454
}
1455

1456
func processResets(c *cli.Context, domain string, wes chan types.WorkflowExecution, done chan bool, wg *sync.WaitGroup, params batchResetParamsType) {
×
1457
        for {
×
1458
                select {
×
1459
                case we := <-wes:
×
1460
                        fmt.Println("received: ", we.GetWorkflowID(), we.GetRunID())
×
1461
                        wid := we.GetWorkflowID()
×
1462
                        rid := we.GetRunID()
×
1463
                        var err error
×
1464
                        for i := 0; i < 3; i++ {
×
1465
                                err = doReset(c, domain, wid, rid, params)
×
1466
                                if err == nil {
×
1467
                                        break
×
1468
                                }
1469
                                if _, ok := err.(*types.BadRequestError); ok {
×
1470
                                        break
×
1471
                                }
1472
                                fmt.Println("failed and retry...: ", wid, rid, err)
×
1473
                                time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
×
1474
                        }
1475
                        time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
×
1476
                        if err != nil {
×
1477
                                fmt.Println("[ERROR] failed processing: ", wid, rid, err.Error())
×
1478
                        }
×
1479
                case <-done:
×
1480
                        wg.Done()
×
1481
                        return
×
1482
                }
1483
        }
1484
}
1485

1486
type batchResetParamsType struct {
1487
        reason               string
1488
        skipCurrentOpen      bool
1489
        skipCurrentCompleted bool
1490
        nonDeterministicOnly bool
1491
        skipBaseNotCurrent   bool
1492
        dryRun               bool
1493
        resetType            string
1494
        decisionOffset       int
1495
        skipSignalReapply    bool
1496
}
1497

1498
// ResetInBatch resets workflow in batch
1499
func ResetInBatch(c *cli.Context) {
×
1500
        domain := getRequiredGlobalOption(c, FlagDomain)
×
1501
        resetType := getRequiredOption(c, FlagResetType)
×
1502
        decisionOffset := c.Int(FlagDecisionOffset)
×
1503
        if decisionOffset > 0 {
×
1504
                ErrorAndExit("Only decision offset <=0 is supported", nil)
×
1505
        }
×
1506

1507
        inFileName := c.String(FlagInputFile)
×
1508
        query := c.String(FlagListQuery)
×
1509
        excludeFileName := c.String(FlagExcludeFile)
×
1510
        excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
×
1511
        separator := c.String(FlagInputSeparator)
×
1512
        parallel := c.Int(FlagParallism)
×
1513

×
1514
        extraForResetType, ok := resetTypesMap[resetType]
×
1515
        if !ok {
×
1516
                ErrorAndExit("Not supported reset type", nil)
×
1517
        } else if len(extraForResetType) > 0 {
×
1518
                getRequiredOption(c, extraForResetType)
×
1519
        }
×
1520

1521
        if excludeFileName != "" && excludeQuery != "" {
×
1522
                ErrorAndExit("Only one of the excluding option is allowed", nil)
×
1523
        }
×
1524

1525
        batchResetParams := batchResetParamsType{
×
1526
                reason:               getRequiredOption(c, FlagReason),
×
1527
                skipCurrentOpen:      c.Bool(FlagSkipCurrentOpen),
×
1528
                skipCurrentCompleted: c.Bool(FlagSkipCurrentCompleted),
×
1529
                nonDeterministicOnly: c.Bool(FlagNonDeterministicOnly),
×
1530
                skipBaseNotCurrent:   c.Bool(FlagSkipBaseIsNotCurrent),
×
1531
                dryRun:               c.Bool(FlagDryRun),
×
1532
                resetType:            resetType,
×
1533
                decisionOffset:       decisionOffset,
×
1534
                skipSignalReapply:    c.Bool(FlagSkipSignalReapply),
×
1535
        }
×
1536

×
1537
        if inFileName == "" && query == "" {
×
1538
                ErrorAndExit("Must provide input file or list query to get target workflows to reset", nil)
×
1539
        }
×
1540

1541
        wg := &sync.WaitGroup{}
×
1542

×
1543
        wes := make(chan types.WorkflowExecution)
×
1544
        done := make(chan bool)
×
1545
        for i := 0; i < parallel; i++ {
×
1546
                wg.Add(1)
×
1547
                go processResets(c, domain, wes, done, wg, batchResetParams)
×
1548
        }
×
1549

1550
        // read excluded workflowIDs
1551
        excludeWIDs := map[string]bool{}
×
1552
        if excludeFileName != "" {
×
1553
                excludeWIDs = loadWorkflowIDsFromFile(excludeFileName, separator)
×
1554
        }
×
1555
        if excludeQuery != "" {
×
1556
                excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
×
1557
        }
×
1558

1559
        fmt.Println("num of excluded WorkflowIDs:", len(excludeWIDs))
×
1560

×
1561
        if len(inFileName) > 0 {
×
1562
                inFile, err := os.Open(inFileName)
×
1563
                if err != nil {
×
1564
                        ErrorAndExit("Open failed", err)
×
1565
                }
×
1566
                defer inFile.Close()
×
1567
                scanner := bufio.NewScanner(inFile)
×
1568
                idx := 0
×
1569
                for scanner.Scan() {
×
1570
                        idx++
×
1571
                        line := strings.TrimSpace(scanner.Text())
×
1572
                        if len(line) == 0 {
×
1573
                                fmt.Printf("line %v is empty, skipped\n", idx)
×
1574
                                continue
×
1575
                        }
1576
                        cols := strings.Split(line, separator)
×
1577
                        if len(cols) < 1 {
×
1578
                                ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
×
1579
                        }
×
1580
                        fmt.Printf("Start processing line %v ...\n", idx)
×
1581
                        wid := strings.TrimSpace(cols[0])
×
1582
                        rid := ""
×
1583
                        if len(cols) > 1 {
×
1584
                                rid = strings.TrimSpace(cols[1])
×
1585
                        }
×
1586

1587
                        if excludeWIDs[wid] {
×
1588
                                fmt.Println("skip by exclude file: ", wid, rid)
×
1589
                                continue
×
1590
                        }
1591

1592
                        wes <- types.WorkflowExecution{
×
1593
                                WorkflowID: wid,
×
1594
                                RunID:      rid,
×
1595
                        }
×
1596
                }
1597
        } else {
×
1598
                wfClient := getWorkflowClient(c)
×
1599
                pageSize := 1000
×
1600
                var nextPageToken []byte
×
1601
                var result []*types.WorkflowExecutionInfo
×
1602
                for {
×
1603
                        result, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
×
1604
                        for _, we := range result {
×
1605
                                wid := we.Execution.GetWorkflowID()
×
1606
                                rid := we.Execution.GetRunID()
×
1607
                                if excludeWIDs[wid] {
×
1608
                                        fmt.Println("skip by exclude file: ", wid, rid)
×
1609
                                        continue
×
1610
                                }
1611

1612
                                wes <- types.WorkflowExecution{
×
1613
                                        WorkflowID: wid,
×
1614
                                        RunID:      rid,
×
1615
                                }
×
1616
                        }
1617

1618
                        if nextPageToken == nil {
×
1619
                                break
×
1620
                        }
1621
                }
1622
        }
1623

1624
        close(done)
×
1625
        fmt.Println("wait for all goroutines...")
×
1626
        wg.Wait()
×
1627
}
1628

1629
func loadWorkflowIDsFromFile(excludeFileName, separator string) map[string]bool {
×
1630
        excludeWIDs := map[string]bool{}
×
1631
        if len(excludeFileName) > 0 {
×
1632
                // This code is only used in the CLI. The input provided is from a trusted user.
×
1633
                // #nosec
×
1634
                excFile, err := os.Open(excludeFileName)
×
1635
                if err != nil {
×
1636
                        ErrorAndExit("Open failed2", err)
×
1637
                }
×
1638
                defer excFile.Close()
×
1639
                scanner := bufio.NewScanner(excFile)
×
1640
                idx := 0
×
1641
                for scanner.Scan() {
×
1642
                        idx++
×
1643
                        line := strings.TrimSpace(scanner.Text())
×
1644
                        if len(line) == 0 {
×
1645
                                fmt.Printf("line %v is empty, skipped\n", idx)
×
1646
                                continue
×
1647
                        }
1648
                        cols := strings.Split(line, separator)
×
1649
                        if len(cols) < 1 {
×
1650
                                ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
×
1651
                        }
×
1652
                        wid := strings.TrimSpace(cols[0])
×
1653
                        excludeWIDs[wid] = true
×
1654
                }
1655
        }
1656
        return excludeWIDs
×
1657
}
1658

1659
func printErrorAndReturn(msg string, err error) error {
×
1660
        fmt.Println(msg)
×
1661
        return err
×
1662
}
×
1663

1664
func doReset(c *cli.Context, domain, wid, rid string, params batchResetParamsType) error {
×
1665
        ctx, cancel := newContext(c)
×
1666
        defer cancel()
×
1667

×
1668
        frontendClient := cFactory.ServerFrontendClient(c)
×
1669
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
1670
                Domain: domain,
×
1671
                Execution: &types.WorkflowExecution{
×
1672
                        WorkflowID: wid,
×
1673
                },
×
1674
        })
×
1675
        if err != nil {
×
1676
                return printErrorAndReturn("DescribeWorkflowExecution failed", err)
×
1677
        }
×
1678

1679
        currentRunID := resp.WorkflowExecutionInfo.Execution.GetRunID()
×
1680
        if currentRunID != rid && params.skipBaseNotCurrent {
×
1681
                fmt.Println("skip because base run is different from current run: ", wid, rid, currentRunID)
×
1682
                return nil
×
1683
        }
×
1684
        if rid == "" {
×
1685
                rid = currentRunID
×
1686
        }
×
1687

1688
        if resp.WorkflowExecutionInfo.CloseStatus == nil || resp.WorkflowExecutionInfo.CloseTime == nil {
×
1689
                if params.skipCurrentOpen {
×
1690
                        fmt.Println("skip because current run is open: ", wid, rid, currentRunID)
×
1691
                        return nil
×
1692
                }
×
1693
        }
1694

1695
        if resp.WorkflowExecutionInfo.GetCloseStatus() == types.WorkflowExecutionCloseStatusCompleted {
×
1696
                if params.skipCurrentCompleted {
×
1697
                        fmt.Println("skip because current run is completed: ", wid, rid, currentRunID)
×
1698
                        return nil
×
1699
                }
×
1700
        }
1701

1702
        if params.nonDeterministicOnly {
×
1703
                isLDN, err := isLastEventDecisionTaskFailedWithNonDeterminism(ctx, domain, wid, rid, frontendClient)
×
1704
                if err != nil {
×
1705
                        return printErrorAndReturn("check isLastEventDecisionTaskFailedWithNonDeterminism failed", err)
×
1706
                }
×
1707
                if !isLDN {
×
1708
                        fmt.Println("skip because last event is not DecisionTaskFailedWithNonDeterminism")
×
1709
                        return nil
×
1710
                }
×
1711
        }
1712

1713
        resetBaseRunID, decisionFinishID, err := getResetEventIDByType(ctx, c, params.resetType, params.decisionOffset, domain, wid, rid, frontendClient)
×
1714
        if err != nil {
×
1715
                return printErrorAndReturn("getResetEventIDByType failed", err)
×
1716
        }
×
1717
        fmt.Println("DecisionFinishEventId for reset:", wid, rid, resetBaseRunID, decisionFinishID)
×
1718

×
1719
        if params.dryRun {
×
1720
                fmt.Printf("dry run to reset wid: %v, rid:%v to baseRunID:%v, eventID:%v \n", wid, rid, resetBaseRunID, decisionFinishID)
×
1721
        } else {
×
1722
                resp2, err := frontendClient.ResetWorkflowExecution(ctx, &types.ResetWorkflowExecutionRequest{
×
1723
                        Domain: domain,
×
1724
                        WorkflowExecution: &types.WorkflowExecution{
×
1725
                                WorkflowID: wid,
×
1726
                                RunID:      resetBaseRunID,
×
1727
                        },
×
1728
                        DecisionFinishEventID: decisionFinishID,
×
1729
                        RequestID:             uuid.New(),
×
1730
                        Reason:                fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), params.reason),
×
1731
                        SkipSignalReapply:     params.skipSignalReapply,
×
1732
                })
×
1733

×
1734
                if err != nil {
×
1735
                        return printErrorAndReturn("ResetWorkflowExecution failed", err)
×
1736
                }
×
1737
                fmt.Println("new runID for wid/rid is ,", wid, rid, resp2.GetRunID())
×
1738
        }
1739

1740
        return nil
×
1741
}
1742

1743
func isLastEventDecisionTaskFailedWithNonDeterminism(ctx context.Context, domain, wid, rid string, frontendClient frontend.Client) (bool, error) {
×
1744
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1745
                Domain: domain,
×
1746
                Execution: &types.WorkflowExecution{
×
1747
                        WorkflowID: wid,
×
1748
                        RunID:      rid,
×
1749
                },
×
1750
                MaximumPageSize: 1000,
×
1751
                NextPageToken:   nil,
×
1752
        }
×
1753

×
1754
        var firstEvent, decisionFailed *types.HistoryEvent
×
1755
        for {
×
1756
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1757
                if err != nil {
×
1758
                        return false, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1759
                }
×
1760
                for _, e := range resp.GetHistory().GetEvents() {
×
1761
                        if firstEvent == nil {
×
1762
                                firstEvent = e
×
1763
                        }
×
1764
                        if e.GetEventType() == types.EventTypeDecisionTaskFailed {
×
1765
                                decisionFailed = e
×
1766
                        } else if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
×
1767
                                decisionFailed = nil
×
1768
                        }
×
1769
                }
1770
                if len(resp.NextPageToken) != 0 {
×
1771
                        req.NextPageToken = resp.NextPageToken
×
1772
                } else {
×
1773
                        break
×
1774
                }
1775
        }
1776

1777
        if decisionFailed != nil {
×
1778
                attr := decisionFailed.GetDecisionTaskFailedEventAttributes()
×
1779
                if attr.GetCause() == types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure ||
×
1780
                        strings.Contains(string(attr.GetDetails()), "nondeterministic") {
×
1781
                        fmt.Printf("found non-deterministic workflow wid:%v, rid:%v, originalStartTime:%v \n", wid, rid, time.Unix(0, firstEvent.GetTimestamp()))
×
1782
                        return true, nil
×
1783
                }
×
1784
        }
1785

1786
        return false, nil
×
1787
}
1788

1789
func getResetEventIDByType(
1790
        ctx context.Context,
1791
        c *cli.Context,
1792
        resetType string, decisionOffset int,
1793
        domain, wid, rid string,
1794
        frontendClient frontend.Client,
1795
) (resetBaseRunID string, decisionFinishID int64, err error) {
×
1796
        // default to the same runID
×
1797
        resetBaseRunID = rid
×
1798

×
1799
        fmt.Println("resetType:", resetType)
×
1800
        switch resetType {
×
1801
        case resetTypeLastDecisionCompleted:
×
1802
                decisionFinishID, err = getLastDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskCompleted, decisionOffset)
×
1803
                if err != nil {
×
1804
                        return
×
1805
                }
×
1806
        case resetTypeLastContinuedAsNew:
×
1807
                // this reset type may change the base runID
×
1808
                resetBaseRunID, decisionFinishID, err = getLastContinueAsNewID(ctx, domain, wid, rid, frontendClient)
×
1809
                if err != nil {
×
1810
                        return
×
1811
                }
×
1812
        case resetTypeFirstDecisionCompleted:
×
1813
                decisionFinishID, err = getFirstDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskCompleted)
×
1814
                if err != nil {
×
1815
                        return
×
1816
                }
×
1817
        case resetTypeBadBinary:
×
1818
                binCheckSum := c.String(FlagResetBadBinaryChecksum)
×
1819
                decisionFinishID, err = getBadDecisionCompletedID(ctx, domain, wid, rid, binCheckSum, frontendClient)
×
1820
                if err != nil {
×
1821
                        return
×
1822
                }
×
1823
        case resetTypeDecisionCompletedTime:
×
1824
                earliestTime := parseTime(c.String(FlagEarliestTime), 0)
×
1825
                decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTime, frontendClient)
×
1826
                if err != nil {
×
1827
                        return
×
1828
                }
×
1829
        case resetTypeFirstDecisionScheduled:
×
1830
                decisionFinishID, err = getFirstDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskScheduled)
×
1831
                if err != nil {
×
1832
                        return
×
1833
                }
×
1834
                // decisionFinishID is exclusive in reset API
1835
                decisionFinishID++
×
1836
        case resetTypeLastDecisionScheduled:
×
1837
                decisionFinishID, err = getLastDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskScheduled, decisionOffset)
×
1838
                if err != nil {
×
1839
                        return
×
1840
                }
×
1841
                // decisionFinishID is exclusive in reset API
1842
                decisionFinishID++
×
1843
        default:
×
1844
                panic("not supported resetType")
×
1845
        }
1846
        return
×
1847
}
1848

1849
func getFirstDecisionTaskByType(
1850
        ctx context.Context,
1851
        domain string,
1852
        workflowID string,
1853
        runID string,
1854
        frontendClient frontend.Client,
1855
        decisionType types.EventType,
1856
) (decisionFinishID int64, err error) {
×
1857

×
1858
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1859
                Domain: domain,
×
1860
                Execution: &types.WorkflowExecution{
×
1861
                        WorkflowID: workflowID,
×
1862
                        RunID:      runID,
×
1863
                },
×
1864
                MaximumPageSize: 1000,
×
1865
                NextPageToken:   nil,
×
1866
        }
×
1867

×
1868
        for {
×
1869
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1870
                if err != nil {
×
1871
                        return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1872
                }
×
1873

1874
                for _, e := range resp.GetHistory().GetEvents() {
×
1875
                        if e.GetEventType() == decisionType {
×
1876
                                decisionFinishID = e.ID
×
1877
                                return decisionFinishID, nil
×
1878
                        }
×
1879
                }
1880

1881
                if len(resp.NextPageToken) != 0 {
×
1882
                        req.NextPageToken = resp.NextPageToken
×
1883
                } else {
×
1884
                        break
×
1885
                }
1886
        }
1887
        if decisionFinishID == 0 {
×
1888
                return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
1889
        }
×
1890
        return
×
1891
}
1892

1893
func getCurrentRunID(ctx context.Context, domain, wid string, frontendClient frontend.Client) (string, error) {
×
1894
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
1895
                Domain: domain,
×
1896
                Execution: &types.WorkflowExecution{
×
1897
                        WorkflowID: wid,
×
1898
                },
×
1899
        })
×
1900
        if err != nil {
×
1901
                return "", err
×
1902
        }
×
1903
        return resp.WorkflowExecutionInfo.Execution.GetRunID(), nil
×
1904
}
1905

1906
func getBadDecisionCompletedID(ctx context.Context, domain, wid, rid, binChecksum string, frontendClient frontend.Client) (decisionFinishID int64, err error) {
×
1907
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
1908
                Domain: domain,
×
1909
                Execution: &types.WorkflowExecution{
×
1910
                        WorkflowID: wid,
×
1911
                        RunID:      rid,
×
1912
                },
×
1913
        })
×
1914
        if err != nil {
×
1915
                return 0, printErrorAndReturn("DescribeWorkflowExecution failed", err)
×
1916
        }
×
1917

1918
        _, p := execution.FindAutoResetPoint(clock.NewRealTimeSource(), &types.BadBinaries{
×
1919
                Binaries: map[string]*types.BadBinaryInfo{
×
1920
                        binChecksum: {},
×
1921
                },
×
1922
        }, resp.WorkflowExecutionInfo.AutoResetPoints)
×
1923
        if p != nil {
×
1924
                decisionFinishID = p.GetFirstDecisionCompletedID()
×
1925
        }
×
1926

1927
        if decisionFinishID == 0 {
×
1928
                return 0, printErrorAndReturn("Get DecisionFinishID failed", &types.BadRequestError{Message: "no DecisionFinishID"})
×
1929
        }
×
1930
        return
×
1931
}
1932

1933
func getLastDecisionTaskByType(
1934
        ctx context.Context,
1935
        domain string,
1936
        workflowID string,
1937
        runID string,
1938
        frontendClient frontend.Client,
1939
        decisionType types.EventType,
1940
        decisionOffset int,
1941
) (int64, error) {
×
1942

×
1943
        // this fixedSizeQueue is for remembering the offset decision eventID
×
1944
        fixedSizeQueue := make([]int64, 0)
×
1945
        size := int(math.Abs(float64(decisionOffset))) + 1
×
1946

×
1947
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1948
                Domain: domain,
×
1949
                Execution: &types.WorkflowExecution{
×
1950
                        WorkflowID: workflowID,
×
1951
                        RunID:      runID,
×
1952
                },
×
1953
                MaximumPageSize: 1000,
×
1954
                NextPageToken:   nil,
×
1955
        }
×
1956

×
1957
        for {
×
1958
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1959
                if err != nil {
×
1960
                        return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1961
                }
×
1962

1963
                for _, e := range resp.GetHistory().GetEvents() {
×
1964
                        if e.GetEventType() == decisionType {
×
1965
                                decisionEventID := e.ID
×
1966
                                fixedSizeQueue = append(fixedSizeQueue, decisionEventID)
×
1967
                                if len(fixedSizeQueue) > size {
×
1968
                                        fixedSizeQueue = fixedSizeQueue[1:]
×
1969
                                }
×
1970
                        }
1971
                }
1972

1973
                if len(resp.NextPageToken) != 0 {
×
1974
                        req.NextPageToken = resp.NextPageToken
×
1975
                } else {
×
1976
                        break
×
1977
                }
1978
        }
1979
        if len(fixedSizeQueue) == 0 {
×
1980
                return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
1981
        }
×
1982
        return fixedSizeQueue[0], nil
×
1983
}
1984

1985
func getLastContinueAsNewID(ctx context.Context, domain, wid, rid string, frontendClient frontend.Client) (resetBaseRunID string, decisionFinishID int64, err error) {
×
1986
        // get first event
×
1987
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1988
                Domain: domain,
×
1989
                Execution: &types.WorkflowExecution{
×
1990
                        WorkflowID: wid,
×
1991
                        RunID:      rid,
×
1992
                },
×
1993
                MaximumPageSize: 1,
×
1994
                NextPageToken:   nil,
×
1995
        }
×
1996
        resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1997
        if err != nil {
×
1998
                return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1999
        }
×
2000
        firstEvent := resp.History.Events[0]
×
2001
        resetBaseRunID = firstEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunID()
×
2002
        if resetBaseRunID == "" {
×
2003
                return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", fmt.Errorf("cannot get resetBaseRunID"))
×
2004
        }
×
2005

2006
        req = &types.GetWorkflowExecutionHistoryRequest{
×
2007
                Domain: domain,
×
2008
                Execution: &types.WorkflowExecution{
×
2009
                        WorkflowID: wid,
×
2010
                        RunID:      resetBaseRunID,
×
2011
                },
×
2012
                MaximumPageSize: 1000,
×
2013
                NextPageToken:   nil,
×
2014
        }
×
2015
        for {
×
2016
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
2017
                if err != nil {
×
2018
                        return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
2019
                }
×
2020
                for _, e := range resp.GetHistory().GetEvents() {
×
2021
                        if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
×
2022
                                decisionFinishID = e.ID
×
2023
                        }
×
2024
                }
2025
                if len(resp.NextPageToken) != 0 {
×
2026
                        req.NextPageToken = resp.NextPageToken
×
2027
                } else {
×
2028
                        break
×
2029
                }
2030
        }
2031
        if decisionFinishID == 0 {
×
2032
                return "", 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
2033
        }
×
2034
        return
×
2035
}
2036

2037
// CompleteActivity completes an activity
2038
func CompleteActivity(c *cli.Context) {
×
2039
        domain := getRequiredGlobalOption(c, FlagDomain)
×
2040
        wid := getRequiredOption(c, FlagWorkflowID)
×
2041
        rid := getRequiredOption(c, FlagRunID)
×
2042
        activityID := getRequiredOption(c, FlagActivityID)
×
2043
        if len(activityID) == 0 {
×
2044
                ErrorAndExit("Invalid activityID", fmt.Errorf("activityID cannot be empty"))
×
2045
        }
×
2046
        result := getRequiredOption(c, FlagResult)
×
2047
        identity := getRequiredOption(c, FlagIdentity)
×
2048
        ctx, cancel := newContext(c)
×
2049
        defer cancel()
×
2050

×
2051
        frontendClient := cFactory.ServerFrontendClient(c)
×
2052
        err := frontendClient.RespondActivityTaskCompletedByID(ctx, &types.RespondActivityTaskCompletedByIDRequest{
×
2053
                Domain:     domain,
×
2054
                WorkflowID: wid,
×
2055
                RunID:      rid,
×
2056
                ActivityID: activityID,
×
2057
                Result:     []byte(result),
×
2058
                Identity:   identity,
×
2059
        })
×
2060
        if err != nil {
×
2061
                ErrorAndExit("Completing activity failed", err)
×
2062
        } else {
×
2063
                fmt.Println("Complete activity successfully.")
×
2064
        }
×
2065
}
2066

2067
// FailActivity fails an activity
2068
func FailActivity(c *cli.Context) {
×
2069
        domain := getRequiredGlobalOption(c, FlagDomain)
×
2070
        wid := getRequiredOption(c, FlagWorkflowID)
×
2071
        rid := getRequiredOption(c, FlagRunID)
×
2072
        activityID := getRequiredOption(c, FlagActivityID)
×
2073
        if len(activityID) == 0 {
×
2074
                ErrorAndExit("Invalid activityID", fmt.Errorf("activityID cannot be empty"))
×
2075
        }
×
2076
        reason := getRequiredOption(c, FlagReason)
×
2077
        detail := getRequiredOption(c, FlagDetail)
×
2078
        identity := getRequiredOption(c, FlagIdentity)
×
2079
        ctx, cancel := newContext(c)
×
2080
        defer cancel()
×
2081

×
2082
        frontendClient := cFactory.ServerFrontendClient(c)
×
2083
        err := frontendClient.RespondActivityTaskFailedByID(ctx, &types.RespondActivityTaskFailedByIDRequest{
×
2084
                Domain:     domain,
×
2085
                WorkflowID: wid,
×
2086
                RunID:      rid,
×
2087
                ActivityID: activityID,
×
2088
                Reason:     common.StringPtr(reason),
×
2089
                Details:    []byte(detail),
×
2090
                Identity:   identity,
×
2091
        })
×
2092
        if err != nil {
×
2093
                ErrorAndExit("Failing activity failed", err)
×
2094
        } else {
×
2095
                fmt.Println("Fail activity successfully.")
×
2096
        }
×
2097
}
2098

2099
// ObserveHistoryWithID show the process of running workflow
2100
func ObserveHistoryWithID(c *cli.Context) {
2✔
2101
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
2102
        if !c.Args().Present() {
2✔
2103
                ErrorAndExit("Argument workflow_id is required.", nil)
×
2104
        }
×
2105
        wid := c.Args().First()
2✔
2106
        rid := ""
2✔
2107
        if c.NArg() >= 2 {
2✔
2108
                rid = c.Args().Get(1)
×
2109
        }
×
2110

2111
        printWorkflowProgress(c, domain, wid, rid)
2✔
2112
}
2113

2114
func getEarliestDecisionID(
2115
        ctx context.Context,
2116
        domain string, wid string,
2117
        rid string, earliestTime int64,
2118
        frontendClient frontend.Client,
2119
) (decisionFinishID int64, err error) {
×
2120
        req := &types.GetWorkflowExecutionHistoryRequest{
×
2121
                Domain: domain,
×
2122
                Execution: &types.WorkflowExecution{
×
2123
                        WorkflowID: wid,
×
2124
                        RunID:      rid,
×
2125
                },
×
2126
                MaximumPageSize: 1000,
×
2127
                NextPageToken:   nil,
×
2128
        }
×
2129

×
2130
OuterLoop:
×
2131
        for {
×
2132
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
2133
                if err != nil {
×
2134
                        return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
2135
                }
×
2136
                for _, e := range resp.GetHistory().GetEvents() {
×
2137
                        if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
×
2138
                                if e.GetTimestamp() >= earliestTime {
×
2139
                                        decisionFinishID = e.ID
×
2140
                                        break OuterLoop
×
2141
                                }
2142
                        }
2143
                }
2144
                if len(resp.NextPageToken) != 0 {
×
2145
                        req.NextPageToken = resp.NextPageToken
×
2146
                } else {
×
2147
                        break
×
2148
                }
2149
        }
2150
        if decisionFinishID == 0 {
×
2151
                return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
2152
        }
×
2153
        return
×
2154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc