• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186f2b7-dd32-4c49-8df6-51a5df0eae06

18 Mar 2023 03:35AM UTC coverage: 57.076% (+0.05%) from 57.024%
0186f2b7-dd32-4c49-8df6-51a5df0eae06

Pull #5157

buildkite

David Porter
Remove accidentally committed comment junk code
Pull Request #5157: Feature/zonal partitioning admin mapping

176 of 176 new or added lines in 2 files covered. (100.0%)

85504 of 149807 relevant lines covered (57.08%)

2304.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.12
/tools/cli/workflowCommands.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package cli
23

24
import (
25
        "bufio"
26
        "context"
27
        "encoding/json"
28
        "errors"
29
        "fmt"
30
        "io/ioutil"
31
        "math"
32
        "math/rand"
33
        "os"
34
        "regexp"
35
        "strconv"
36
        "strings"
37
        "sync"
38
        "time"
39

40
        "github.com/olekukonko/tablewriter"
41
        "github.com/pborman/uuid"
42
        "github.com/urfave/cli"
43

44
        "github.com/uber/cadence/client/frontend"
45
        "github.com/uber/cadence/common"
46
        "github.com/uber/cadence/common/clock"
47
        "github.com/uber/cadence/common/types"
48
        "github.com/uber/cadence/service/history/execution"
49
)
50

51
// RestartWorkflow restarts a workflow execution
52
func RestartWorkflow(c *cli.Context) {
2✔
53
        wfClient := getWorkflowClient(c)
2✔
54

2✔
55
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
56
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
57
        rid := c.String(FlagRunID)
2✔
58

2✔
59
        ctx, cancel := newContext(c)
2✔
60
        defer cancel()
2✔
61
        resp, err := wfClient.RestartWorkflowExecution(
2✔
62
                ctx,
2✔
63
                &types.RestartWorkflowExecutionRequest{
2✔
64
                        Domain: domain,
2✔
65
                        WorkflowExecution: &types.WorkflowExecution{
2✔
66
                                WorkflowID: wid,
2✔
67
                                RunID:      rid,
2✔
68
                        }, Identity: getCliIdentity(),
2✔
69
                },
2✔
70
        )
2✔
71

2✔
72
        if err != nil {
3✔
73
                ErrorAndExit("Restart workflow failed.", err)
1✔
74
        } else {
2✔
75
                fmt.Printf("Restarted Workflow Id: %s, run Id: %s\n", wid, resp.GetRunID())
1✔
76
        }
1✔
77
}
78

79
// ShowHistory shows the history of given workflow execution based on workflowID and runID.
80
func ShowHistory(c *cli.Context) {
3✔
81
        wid := getRequiredOption(c, FlagWorkflowID)
3✔
82
        rid := c.String(FlagRunID)
3✔
83
        showHistoryHelper(c, wid, rid)
3✔
84
}
3✔
85

86
// ShowHistoryWithWID shows the history of given workflow with workflow_id
87
func ShowHistoryWithWID(c *cli.Context) {
1✔
88
        if !c.Args().Present() {
1✔
89
                ErrorAndExit("Argument workflow_id is required.", nil)
×
90
        }
×
91
        wid := c.Args().First()
1✔
92
        rid := ""
1✔
93
        if c.NArg() >= 2 {
1✔
94
                rid = c.Args().Get(1)
×
95
        }
×
96
        showHistoryHelper(c, wid, rid)
1✔
97
}
98

99
func showHistoryHelper(c *cli.Context, wid, rid string) {
4✔
100
        wfClient := getWorkflowClient(c)
4✔
101

4✔
102
        domain := getRequiredGlobalOption(c, FlagDomain)
4✔
103
        printDateTime := c.Bool(FlagPrintDateTime)
4✔
104
        printRawTime := c.Bool(FlagPrintRawTime)
4✔
105
        printFully := c.Bool(FlagPrintFullyDetail)
4✔
106
        printVersion := c.Bool(FlagPrintEventVersion)
4✔
107
        outputFileName := c.String(FlagOutputFilename)
4✔
108
        var maxFieldLength int
4✔
109
        if c.IsSet(FlagMaxFieldLength) || !printFully {
8✔
110
                maxFieldLength = c.Int(FlagMaxFieldLength)
4✔
111
        }
4✔
112
        resetPointsOnly := c.Bool(FlagResetPointsOnly)
4✔
113

4✔
114
        ctx, cancel := newContext(c)
4✔
115
        defer cancel()
4✔
116
        history, err := GetHistory(ctx, wfClient, domain, wid, rid)
4✔
117
        if err != nil {
4✔
118
                ErrorAndExit(fmt.Sprintf("Failed to get history on workflow id: %s, run id: %s.", wid, rid), err)
×
119
        }
×
120

121
        prevEvent := types.HistoryEvent{}
4✔
122
        if printFully { // dump everything
4✔
123
                for _, e := range history.Events {
×
124
                        if resetPointsOnly {
×
125
                                if prevEvent.GetEventType() != types.EventTypeDecisionTaskStarted {
×
126
                                        prevEvent = *e
×
127
                                        continue
×
128
                                }
129
                                prevEvent = *e
×
130
                        }
131
                        fmt.Println(anyToString(e, true, maxFieldLength))
×
132
                }
133
        } else if c.IsSet(FlagEventID) { // only dump that event
4✔
134
                eventID := c.Int(FlagEventID)
×
135
                if eventID <= 0 || eventID > len(history.Events) {
×
136
                        ErrorAndExit("EventId out of range.", fmt.Errorf("number should be 1 - %d inclusive", len(history.Events)))
×
137
                }
×
138
                e := history.Events[eventID-1]
×
139
                fmt.Println(anyToString(e, true, 0))
×
140
        } else { // use table to pretty output, will trim long text
4✔
141
                table := tablewriter.NewWriter(os.Stdout)
4✔
142
                table.SetBorder(false)
4✔
143
                table.SetColumnSeparator("")
4✔
144
                for _, e := range history.Events {
8✔
145
                        if resetPointsOnly {
4✔
146
                                if prevEvent.GetEventType() != types.EventTypeDecisionTaskStarted {
×
147
                                        prevEvent = *e
×
148
                                        continue
×
149
                                }
150
                                prevEvent = *e
×
151
                        }
152

153
                        columns := []string{}
4✔
154
                        columns = append(columns, strconv.FormatInt(e.ID, 10))
4✔
155

4✔
156
                        if printRawTime {
5✔
157
                                columns = append(columns, strconv.FormatInt(e.GetTimestamp(), 10))
1✔
158
                        } else if printDateTime {
5✔
159
                                columns = append(columns, convertTime(e.GetTimestamp(), false))
1✔
160
                        }
1✔
161
                        if printVersion {
4✔
162
                                columns = append(columns, fmt.Sprintf("(Version: %v)", e.Version))
×
163
                        }
×
164

165
                        columns = append(columns, ColorEvent(e), HistoryEventToString(e, false, maxFieldLength))
4✔
166
                        table.Append(columns)
4✔
167
                }
168
                table.Render()
4✔
169
        }
170

171
        if outputFileName != "" {
4✔
172
                serializer := &JSONHistorySerializer{}
×
173
                data, err := serializer.Serialize(history)
×
174
                if err != nil {
×
175
                        ErrorAndExit("Failed to serialize history data.", err)
×
176
                }
×
177
                if err := ioutil.WriteFile(outputFileName, data, 0666); err != nil {
×
178
                        ErrorAndExit("Failed to export history data file.", err)
×
179
                }
×
180
        }
181

182
        // finally append activities with retry
183
        frontendClient := cFactory.ServerFrontendClient(c)
4✔
184
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
4✔
185
                Domain: domain,
4✔
186
                Execution: &types.WorkflowExecution{
4✔
187
                        WorkflowID: wid,
4✔
188
                        RunID:      rid,
4✔
189
                },
4✔
190
        })
4✔
191
        if err != nil {
4✔
192
                if _, ok := err.(*types.EntityNotExistsError); ok {
×
193
                        fmt.Println("History Source: History Archival")
×
194
                        return
×
195
                }
×
196
                ErrorAndExit("Describe workflow execution failed, cannot get information of pending activities", err)
×
197
        }
198
        fmt.Println("History Source: Default Storage")
4✔
199

4✔
200
        descOutput := convertDescribeWorkflowExecutionResponse(resp, frontendClient, c)
4✔
201
        if len(descOutput.PendingActivities) > 0 {
4✔
202
                fmt.Println("============Workflow Pending activities============")
×
203
                prettyPrintJSONObject(descOutput.PendingActivities)
×
204
                fmt.Println("NOTE: ActivityStartedEvent with retry policy will be written into history when the activity is finished.")
×
205
        }
×
206

207
}
208

209
// StartWorkflow starts a new workflow execution
210
func StartWorkflow(c *cli.Context) {
3✔
211
        startWorkflowHelper(c, false)
3✔
212
}
3✔
213

214
// RunWorkflow starts a new workflow execution and print workflow progress and result
215
func RunWorkflow(c *cli.Context) {
3✔
216
        startWorkflowHelper(c, true)
3✔
217
}
3✔
218

219
func startWorkflowHelper(c *cli.Context, shouldPrintProgress bool) {
6✔
220
        serviceClient := cFactory.ServerFrontendClient(c)
6✔
221

6✔
222
        startRequest := constructStartWorkflowRequest(c)
6✔
223
        domain := startRequest.GetDomain()
6✔
224
        wid := startRequest.GetWorkflowID()
6✔
225
        workflowType := startRequest.WorkflowType.GetName()
6✔
226
        taskList := startRequest.TaskList.GetName()
6✔
227
        input := string(startRequest.Input)
6✔
228

6✔
229
        startFn := func() {
9✔
230
                tcCtx, cancel := newContext(c)
3✔
231
                defer cancel()
3✔
232
                resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
3✔
233

3✔
234
                if err != nil {
4✔
235
                        ErrorAndExit("Failed to create workflow.", err)
1✔
236
                } else {
3✔
237
                        fmt.Printf("Started Workflow Id: %s, run Id: %s\n", wid, resp.GetRunID())
2✔
238
                }
2✔
239
        }
240

241
        runFn := func() {
9✔
242
                tcCtx, cancel := newContextForLongPoll(c)
3✔
243
                defer cancel()
3✔
244
                resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
3✔
245

3✔
246
                if err != nil {
4✔
247
                        ErrorAndExit("Failed to run workflow.", err)
1✔
248
                }
1✔
249

250
                // print execution summary
251
                fmt.Println(colorMagenta("Running execution:"))
3✔
252
                table := tablewriter.NewWriter(os.Stdout)
3✔
253
                executionData := [][]string{
3✔
254
                        {"Workflow Id", wid},
3✔
255
                        {"Run Id", resp.GetRunID()},
3✔
256
                        {"Type", workflowType},
3✔
257
                        {"Domain", domain},
3✔
258
                        {"Task List", taskList},
3✔
259
                        {"Args", truncate(input)}, // in case of large input
3✔
260
                }
3✔
261
                table.SetBorder(false)
3✔
262
                table.SetColumnSeparator(":")
3✔
263
                table.AppendBulk(executionData) // Add Bulk Data
3✔
264
                table.Render()
3✔
265

3✔
266
                printWorkflowProgress(c, domain, wid, resp.GetRunID())
3✔
267
        }
268

269
        if shouldPrintProgress {
9✔
270
                runFn()
3✔
271
        } else {
6✔
272
                startFn()
3✔
273
        }
3✔
274
}
275

276
func constructStartWorkflowRequest(c *cli.Context) *types.StartWorkflowExecutionRequest {
6✔
277
        domain := getRequiredGlobalOption(c, FlagDomain)
6✔
278
        taskList := getRequiredOption(c, FlagTaskList)
6✔
279
        workflowType := getRequiredOption(c, FlagWorkflowType)
6✔
280
        et := c.Int(FlagExecutionTimeout)
6✔
281
        if et == 0 {
6✔
282
                ErrorAndExit(fmt.Sprintf("Option %s format is invalid.", FlagExecutionTimeout), nil)
×
283
        }
×
284
        dt := c.Int(FlagDecisionTimeout)
6✔
285
        wid := c.String(FlagWorkflowID)
6✔
286
        if len(wid) == 0 {
8✔
287
                wid = uuid.New()
2✔
288
        }
2✔
289
        reusePolicy := defaultWorkflowIDReusePolicy.Ptr()
6✔
290
        if c.IsSet(FlagWorkflowIDReusePolicy) {
6✔
291
                reusePolicy = getWorkflowIDReusePolicy(c.Int(FlagWorkflowIDReusePolicy))
×
292
        }
×
293

294
        input := processJSONInput(c)
6✔
295
        startRequest := &types.StartWorkflowExecutionRequest{
6✔
296
                RequestID:  uuid.New(),
6✔
297
                Domain:     domain,
6✔
298
                WorkflowID: wid,
6✔
299
                WorkflowType: &types.WorkflowType{
6✔
300
                        Name: workflowType,
6✔
301
                },
6✔
302
                TaskList: &types.TaskList{
6✔
303
                        Name: taskList,
6✔
304
                },
6✔
305
                Input:                               []byte(input),
6✔
306
                ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32(et)),
6✔
307
                TaskStartToCloseTimeoutSeconds:      common.Int32Ptr(int32(dt)),
6✔
308
                Identity:                            getCliIdentity(),
6✔
309
                WorkflowIDReusePolicy:               reusePolicy,
6✔
310
        }
6✔
311
        if c.IsSet(FlagCronSchedule) {
6✔
312
                startRequest.CronSchedule = c.String(FlagCronSchedule)
×
313
        }
×
314

315
        if c.IsSet(FlagRetryAttempts) || c.IsSet(FlagRetryExpiration) {
6✔
316
                startRequest.RetryPolicy = &types.RetryPolicy{
×
317
                        InitialIntervalInSeconds: int32(c.Int(FlagRetryInterval)),
×
318
                        BackoffCoefficient:       c.Float64(FlagRetryBackoff),
×
319
                }
×
320

×
321
                if c.IsSet(FlagRetryAttempts) {
×
322
                        startRequest.RetryPolicy.MaximumAttempts = int32(c.Int(FlagRetryAttempts))
×
323
                }
×
324
                if c.IsSet(FlagRetryExpiration) {
×
325
                        startRequest.RetryPolicy.ExpirationIntervalInSeconds = int32(c.Int(FlagRetryExpiration))
×
326
                }
×
327
                if c.IsSet(FlagRetryMaxInterval) {
×
328
                        startRequest.RetryPolicy.MaximumIntervalInSeconds = int32(c.Int(FlagRetryMaxInterval))
×
329
                }
×
330
        }
331

332
        if c.IsSet(DelayStartSeconds) {
6✔
333
                startRequest.DelayStartSeconds = common.Int32Ptr(int32(c.Int(DelayStartSeconds)))
×
334
        }
×
335

336
        if c.IsSet(JitterStartSeconds) {
6✔
337
                startRequest.JitterStartSeconds = common.Int32Ptr(int32(c.Int(JitterStartSeconds)))
×
338
        }
×
339

340
        headerFields := processHeader(c)
6✔
341
        if len(headerFields) != 0 {
6✔
342
                startRequest.Header = &types.Header{Fields: headerFields}
×
343
        }
×
344

345
        memoFields := processMemo(c)
6✔
346
        if len(memoFields) != 0 {
6✔
347
                startRequest.Memo = &types.Memo{Fields: memoFields}
×
348
        }
×
349

350
        searchAttrFields := processSearchAttr(c)
6✔
351
        if len(searchAttrFields) != 0 {
6✔
352
                startRequest.SearchAttributes = &types.SearchAttributes{IndexedFields: searchAttrFields}
×
353
        }
×
354

355
        return startRequest
6✔
356
}
357

358
func processSearchAttr(c *cli.Context) map[string][]byte {
6✔
359
        rawSearchAttrKey := c.String(FlagSearchAttributesKey)
6✔
360
        var searchAttrKeys []string
6✔
361
        if strings.TrimSpace(rawSearchAttrKey) != "" {
6✔
362
                searchAttrKeys = trimSpace(strings.Split(rawSearchAttrKey, searchAttrInputSeparator))
×
363
        }
×
364

365
        rawSearchAttrVal := c.String(FlagSearchAttributesVal)
6✔
366
        var searchAttrVals []interface{}
6✔
367
        if strings.TrimSpace(rawSearchAttrVal) != "" {
6✔
368
                searchAttrValsStr := trimSpace(strings.Split(rawSearchAttrVal, searchAttrInputSeparator))
×
369

×
370
                for _, v := range searchAttrValsStr {
×
371
                        searchAttrVals = append(searchAttrVals, convertStringToRealType(v))
×
372
                }
×
373
        }
374

375
        if len(searchAttrKeys) != len(searchAttrVals) {
6✔
376
                ErrorAndExit("Number of search attributes keys and values are not equal.", nil)
×
377
        }
×
378

379
        fields := map[string][]byte{}
6✔
380
        for i, key := range searchAttrKeys {
6✔
381
                val, err := json.Marshal(searchAttrVals[i])
×
382
                if err != nil {
×
383
                        ErrorAndExit(fmt.Sprintf("Encode value %v error", val), err)
×
384
                }
×
385
                fields[key] = val
×
386
        }
387

388
        return fields
6✔
389
}
390

391
func processHeader(c *cli.Context) map[string][]byte {
6✔
392
        headerKeys := processMultipleKeys(c.String(FlagHeaderKey), " ")
6✔
393
        headerValues := processMultipleJSONValues(processJSONInputHelper(c, jsonTypeHeader))
6✔
394

6✔
395
        if len(headerKeys) != len(headerValues) {
6✔
396
                ErrorAndExit("Number of header keys and values are not equal.", nil)
×
397
        }
×
398

399
        return mapFromKeysValues(headerKeys, headerValues)
6✔
400
}
401

402
func processMemo(c *cli.Context) map[string][]byte {
6✔
403
        memoKeys := processMultipleKeys(c.String(FlagMemoKey), " ")
6✔
404
        memoValues := processMultipleJSONValues(processJSONInputHelper(c, jsonTypeMemo))
6✔
405

6✔
406
        if len(memoKeys) != len(memoValues) {
6✔
407
                ErrorAndExit("Number of memo keys and values are not equal.", nil)
×
408
        }
×
409

410
        return mapFromKeysValues(memoKeys, memoValues)
6✔
411
}
412

413
// helper function to print workflow progress with time refresh every second
414
func printWorkflowProgress(c *cli.Context, domain, wid, rid string) {
7✔
415
        fmt.Println(colorMagenta("Progress:"))
7✔
416

7✔
417
        wfClient := getWorkflowClient(c)
7✔
418
        timeElapse := 1
7✔
419
        isTimeElapseExist := false
7✔
420
        doneChan := make(chan bool)
7✔
421
        var lastEvent *types.HistoryEvent // used for print result of this run
7✔
422
        ticker := time.NewTicker(time.Second).C
7✔
423

7✔
424
        tcCtx, cancel := newIndefiniteContext(c)
7✔
425
        defer cancel()
7✔
426

7✔
427
        showDetails := c.Bool(FlagShowDetail)
7✔
428
        var maxFieldLength int
7✔
429
        if c.IsSet(FlagMaxFieldLength) {
7✔
430
                maxFieldLength = c.Int(FlagMaxFieldLength)
×
431
        }
×
432

433
        go func() {
14✔
434
                iterator, err := GetWorkflowHistoryIterator(tcCtx, wfClient, domain, wid, rid, true, types.HistoryEventFilterTypeAllEvent.Ptr())
7✔
435
                if err != nil {
7✔
436
                        ErrorAndExit("Unable to get history events.", err)
×
437
                }
×
438
                for iterator.HasNext() {
14✔
439
                        entity, err := iterator.Next()
7✔
440
                        event := entity.(*types.HistoryEvent)
7✔
441
                        if err != nil {
7✔
442
                                ErrorAndExit("Unable to read event.", err)
×
443
                        }
×
444
                        if isTimeElapseExist {
7✔
445
                                removePrevious2LinesFromTerminal()
×
446
                                isTimeElapseExist = false
×
447
                        }
×
448
                        if showDetails {
9✔
449
                                fmt.Printf("  %d, %s, %s, %s\n", event.ID, convertTime(event.GetTimestamp(), false), ColorEvent(event), HistoryEventToString(event, true, maxFieldLength))
2✔
450
                        } else {
7✔
451
                                fmt.Printf("  %d, %s, %s\n", event.ID, convertTime(event.GetTimestamp(), false), ColorEvent(event))
5✔
452
                        }
5✔
453
                        lastEvent = event
7✔
454
                }
455
                doneChan <- true
7✔
456
        }()
457

458
        for {
14✔
459
                select {
7✔
460
                case <-ticker:
×
461
                        if isTimeElapseExist {
×
462
                                removePrevious2LinesFromTerminal()
×
463
                        }
×
464
                        fmt.Printf("\nTime elapse: %ds\n", timeElapse)
×
465
                        isTimeElapseExist = true
×
466
                        timeElapse++
×
467
                case <-doneChan: // print result of this run
7✔
468
                        fmt.Println(colorMagenta("\nResult:"))
7✔
469
                        fmt.Printf("  Run Time: %d seconds\n", timeElapse)
7✔
470
                        printRunStatus(lastEvent)
7✔
471
                        return
7✔
472
                }
473
        }
474
}
475

476
// TerminateWorkflow terminates a workflow execution
477
func TerminateWorkflow(c *cli.Context) {
2✔
478
        wfClient := getWorkflowClient(c)
2✔
479

2✔
480
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
481
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
482
        rid := c.String(FlagRunID)
2✔
483
        reason := c.String(FlagReason)
2✔
484

2✔
485
        ctx, cancel := newContext(c)
2✔
486
        defer cancel()
2✔
487
        err := wfClient.TerminateWorkflowExecution(
2✔
488
                ctx,
2✔
489
                &types.TerminateWorkflowExecutionRequest{
2✔
490
                        Domain: domain,
2✔
491
                        Reason: reason,
2✔
492
                        WorkflowExecution: &types.WorkflowExecution{
2✔
493
                                WorkflowID: wid,
2✔
494
                                RunID:      rid,
2✔
495
                        }, Identity: getCliIdentity(),
2✔
496
                },
2✔
497
        )
2✔
498

2✔
499
        if err != nil {
3✔
500
                ErrorAndExit("Terminate workflow failed.", err)
1✔
501
        } else {
2✔
502
                fmt.Println("Terminate workflow succeeded.")
1✔
503
        }
1✔
504
}
505

506
// CancelWorkflow cancels a workflow execution
507
func CancelWorkflow(c *cli.Context) {
2✔
508
        wfClient := getWorkflowClient(c)
2✔
509

2✔
510
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
511
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
512
        rid := c.String(FlagRunID)
2✔
513
        reason := c.String(FlagReason)
2✔
514

2✔
515
        ctx, cancel := newContext(c)
2✔
516
        defer cancel()
2✔
517

2✔
518
        err := wfClient.RequestCancelWorkflowExecution(
2✔
519
                ctx,
2✔
520
                &types.RequestCancelWorkflowExecutionRequest{
2✔
521
                        Domain: domain,
2✔
522
                        WorkflowExecution: &types.WorkflowExecution{
2✔
523
                                WorkflowID: wid,
2✔
524
                                RunID:      rid,
2✔
525
                        },
2✔
526
                        Identity: getCliIdentity(),
2✔
527
                        Cause:    reason,
2✔
528
                },
2✔
529
        )
2✔
530
        if err != nil {
3✔
531
                ErrorAndExit("Cancel workflow failed.", err)
1✔
532
        } else {
2✔
533
                fmt.Println("Cancel workflow succeeded.")
1✔
534
        }
1✔
535
}
536

537
// SignalWorkflow signals a workflow execution
538
func SignalWorkflow(c *cli.Context) {
2✔
539
        serviceClient := cFactory.ServerFrontendClient(c)
2✔
540

2✔
541
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
542
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
543
        rid := c.String(FlagRunID)
2✔
544
        name := getRequiredOption(c, FlagName)
2✔
545
        input := processJSONInput(c)
2✔
546

2✔
547
        tcCtx, cancel := newContext(c)
2✔
548
        defer cancel()
2✔
549
        err := serviceClient.SignalWorkflowExecution(
2✔
550
                tcCtx,
2✔
551
                &types.SignalWorkflowExecutionRequest{
2✔
552
                        Domain: domain,
2✔
553
                        WorkflowExecution: &types.WorkflowExecution{
2✔
554
                                WorkflowID: wid,
2✔
555
                                RunID:      rid,
2✔
556
                        },
2✔
557
                        SignalName: name,
2✔
558
                        Input:      []byte(input),
2✔
559
                        Identity:   getCliIdentity(),
2✔
560
                        RequestID:  uuid.New(),
2✔
561
                },
2✔
562
        )
2✔
563

2✔
564
        if err != nil {
3✔
565
                ErrorAndExit("Signal workflow failed.", err)
1✔
566
        } else {
2✔
567
                fmt.Println("Signal workflow succeeded.")
1✔
568
        }
1✔
569
}
570

571
// SignalWithStartWorkflowExecution starts a workflow execution if not already exists and signals it
572
func SignalWithStartWorkflowExecution(c *cli.Context) {
×
573
        serviceClient := cFactory.ServerFrontendClient(c)
×
574

×
575
        signalWithStartRequest := constructSignalWithStartWorkflowRequest(c)
×
576

×
577
        tcCtx, cancel := newContext(c)
×
578
        defer cancel()
×
579

×
580
        resp, err := serviceClient.SignalWithStartWorkflowExecution(tcCtx, signalWithStartRequest)
×
581
        if err != nil {
×
582
                ErrorAndExit("SignalWithStart workflow failed.", err)
×
583
        } else {
×
584
                fmt.Printf("SignalWithStart workflow succeeded. Workflow Id: %s, run Id: %s\n", signalWithStartRequest.GetWorkflowID(), resp.GetRunID())
×
585
        }
×
586
}
587

588
func constructSignalWithStartWorkflowRequest(c *cli.Context) *types.SignalWithStartWorkflowExecutionRequest {
×
589
        startRequest := constructStartWorkflowRequest(c)
×
590

×
591
        return &types.SignalWithStartWorkflowExecutionRequest{
×
592
                Domain:                              startRequest.Domain,
×
593
                WorkflowID:                          startRequest.WorkflowID,
×
594
                WorkflowType:                        startRequest.WorkflowType,
×
595
                TaskList:                            startRequest.TaskList,
×
596
                Input:                               startRequest.Input,
×
597
                ExecutionStartToCloseTimeoutSeconds: startRequest.ExecutionStartToCloseTimeoutSeconds,
×
598
                TaskStartToCloseTimeoutSeconds:      startRequest.TaskStartToCloseTimeoutSeconds,
×
599
                Identity:                            startRequest.Identity,
×
600
                RequestID:                           startRequest.RequestID,
×
601
                WorkflowIDReusePolicy:               startRequest.WorkflowIDReusePolicy,
×
602
                RetryPolicy:                         startRequest.RetryPolicy,
×
603
                CronSchedule:                        startRequest.CronSchedule,
×
604
                Memo:                                startRequest.Memo,
×
605
                SearchAttributes:                    startRequest.SearchAttributes,
×
606
                Header:                              startRequest.Header,
×
607
                SignalName:                          getRequiredOption(c, FlagName),
×
608
                SignalInput:                         []byte(processJSONInputSignal(c)),
×
609
                DelayStartSeconds:                   startRequest.DelayStartSeconds,
×
610
                JitterStartSeconds:                  startRequest.JitterStartSeconds,
×
611
        }
×
612
}
×
613

614
func processJSONInputSignal(c *cli.Context) string {
×
615
        return processJSONInputHelper(c, jsonTypeSignal)
×
616
}
×
617

618
// QueryWorkflow query workflow execution
619
func QueryWorkflow(c *cli.Context) {
2✔
620
        getRequiredGlobalOption(c, FlagDomain) // for pre-check and alert if not provided
2✔
621
        getRequiredOption(c, FlagWorkflowID)
2✔
622
        queryType := getRequiredOption(c, FlagQueryType)
2✔
623

2✔
624
        queryWorkflowHelper(c, queryType)
2✔
625
}
2✔
626

627
// QueryWorkflowUsingStackTrace query workflow execution using __stack_trace as query type
628
func QueryWorkflowUsingStackTrace(c *cli.Context) {
1✔
629
        queryWorkflowHelper(c, "__stack_trace")
1✔
630
}
1✔
631

632
func queryWorkflowHelper(c *cli.Context, queryType string) {
3✔
633
        serviceClient := cFactory.ServerFrontendClient(c)
3✔
634

3✔
635
        domain := getRequiredGlobalOption(c, FlagDomain)
3✔
636
        wid := getRequiredOption(c, FlagWorkflowID)
3✔
637
        rid := c.String(FlagRunID)
3✔
638
        input := processJSONInput(c)
3✔
639

3✔
640
        tcCtx, cancel := newContext(c)
3✔
641
        defer cancel()
3✔
642
        queryRequest := &types.QueryWorkflowRequest{
3✔
643
                Domain: domain,
3✔
644
                Execution: &types.WorkflowExecution{
3✔
645
                        WorkflowID: wid,
3✔
646
                        RunID:      rid,
3✔
647
                },
3✔
648
                Query: &types.WorkflowQuery{
3✔
649
                        QueryType: queryType,
3✔
650
                },
3✔
651
        }
3✔
652
        if input != "" {
3✔
653
                queryRequest.Query.QueryArgs = []byte(input)
×
654
        }
×
655
        if c.IsSet(FlagQueryRejectCondition) {
3✔
656
                var rejectCondition types.QueryRejectCondition
×
657
                switch c.String(FlagQueryRejectCondition) {
×
658
                case "not_open":
×
659
                        rejectCondition = types.QueryRejectConditionNotOpen
×
660
                case "not_completed_cleanly":
×
661
                        rejectCondition = types.QueryRejectConditionNotCompletedCleanly
×
662
                default:
×
663
                        ErrorAndExit(fmt.Sprintf("invalid reject condition %v, valid values are \"not_open\" and \"not_completed_cleanly\"", c.String(FlagQueryRejectCondition)), nil)
×
664
                }
665
                queryRequest.QueryRejectCondition = &rejectCondition
×
666
        }
667
        if c.IsSet(FlagQueryConsistencyLevel) {
3✔
668
                var consistencyLevel types.QueryConsistencyLevel
×
669
                switch c.String(FlagQueryConsistencyLevel) {
×
670
                case "eventual":
×
671
                        consistencyLevel = types.QueryConsistencyLevelEventual
×
672
                case "strong":
×
673
                        consistencyLevel = types.QueryConsistencyLevelStrong
×
674
                default:
×
675
                        ErrorAndExit(fmt.Sprintf("invalid query consistency level %v, valid values are \"eventual\" and \"strong\"", c.String(FlagQueryConsistencyLevel)), nil)
×
676
                }
677
                queryRequest.QueryConsistencyLevel = &consistencyLevel
×
678
        }
679
        queryResponse, err := serviceClient.QueryWorkflow(tcCtx, queryRequest)
3✔
680
        if err != nil {
4✔
681
                ErrorAndExit("Query workflow failed.", err)
1✔
682
                return
1✔
683
        }
1✔
684

685
        if queryResponse.QueryRejected != nil {
2✔
686
                fmt.Printf("Query was rejected, workflow is in state: %v\n", *queryResponse.QueryRejected.CloseStatus)
×
687
        } else {
2✔
688
                // assume it is json encoded
2✔
689
                fmt.Print(string(queryResponse.QueryResult))
2✔
690
        }
2✔
691
}
692

693
// ListWorkflow list workflow executions based on filters
694
func ListWorkflow(c *cli.Context) {
8✔
695
        displayPagedWorkflows(c, listWorkflows(c), !c.Bool(FlagMore))
8✔
696
}
8✔
697

698
// ListAllWorkflow list all workflow executions based on filters
699
func ListAllWorkflow(c *cli.Context) {
×
700
        displayAllWorkflows(c, filterExcludedWorkflows(c, listWorkflows(c)))
×
701
}
×
702

703
// ScanAllWorkflow list all workflow executions using Scan API.
704
// It should be faster than ListAllWorkflow, but result are not sorted.
705
func ScanAllWorkflow(c *cli.Context) {
×
706
        displayAllWorkflows(c, scanWorkflows(c))
×
707
}
×
708

709
func isQueryOpen(query string) bool {
9✔
710
        var openWFPattern = regexp.MustCompile(`CloseTime[ ]*=[ ]*missing`)
9✔
711
        return openWFPattern.MatchString(query)
9✔
712
}
9✔
713

714
// CountWorkflow count number of workflows
715
func CountWorkflow(c *cli.Context) {
2✔
716
        wfClient := getWorkflowClient(c)
2✔
717

2✔
718
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
719
        query := c.String(FlagListQuery)
2✔
720
        request := &types.CountWorkflowExecutionsRequest{
2✔
721
                Domain: domain,
2✔
722
                Query:  query,
2✔
723
        }
2✔
724

2✔
725
        ctx, cancel := newContextForLongPoll(c)
2✔
726
        defer cancel()
2✔
727
        response, err := wfClient.CountWorkflowExecutions(ctx, request)
2✔
728
        if err != nil {
2✔
729
                ErrorAndExit("Failed to count workflow.", err)
×
730
        }
×
731

732
        fmt.Println(response.GetCount())
2✔
733
}
734

735
// ListArchivedWorkflow lists archived workflow executions based on filters
736
func ListArchivedWorkflow(c *cli.Context) {
1✔
737
        printAll := c.Bool(FlagAll)
1✔
738
        if printAll {
2✔
739
                displayAllWorkflows(c, listArchivedWorkflows(c))
1✔
740
        } else {
1✔
741
                displayPagedWorkflows(c, listArchivedWorkflows(c), false)
×
742
        }
×
743
}
744

745
// DescribeWorkflow show information about the specified workflow execution
746
func DescribeWorkflow(c *cli.Context) {
×
747
        wid := getRequiredOption(c, FlagWorkflowID)
×
748
        rid := c.String(FlagRunID)
×
749

×
750
        describeWorkflowHelper(c, wid, rid)
×
751
}
×
752

753
// DescribeWorkflowWithID show information about the specified workflow execution
754
func DescribeWorkflowWithID(c *cli.Context) {
×
755
        if !c.Args().Present() {
×
756
                ErrorAndExit("Argument workflow_id is required.", nil)
×
757
        }
×
758
        wid := c.Args().First()
×
759
        rid := ""
×
760
        if c.NArg() >= 2 {
×
761
                rid = c.Args().Get(1)
×
762
        }
×
763

764
        describeWorkflowHelper(c, wid, rid)
×
765
}
766

767
func describeWorkflowHelper(c *cli.Context, wid, rid string) {
×
768
        frontendClient := cFactory.ServerFrontendClient(c)
×
769
        domain := getRequiredGlobalOption(c, FlagDomain)
×
770
        printRaw := c.Bool(FlagPrintRaw) // printRaw is false by default,
×
771
        // and will show datetime and decoded search attributes instead of raw timestamp and byte arrays
×
772
        printResetPointsOnly := c.Bool(FlagResetPointsOnly)
×
773

×
774
        ctx, cancel := newContext(c)
×
775
        defer cancel()
×
776

×
777
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
778
                Domain: domain,
×
779
                Execution: &types.WorkflowExecution{
×
780
                        WorkflowID: wid,
×
781
                        RunID:      rid,
×
782
                },
×
783
        })
×
784
        if err != nil {
×
785
                ErrorAndExit("Describe workflow execution failed", err)
×
786
        }
×
787

788
        if printResetPointsOnly {
×
789
                printAutoResetPoints(resp)
×
790
                return
×
791
        }
×
792

793
        var o interface{}
×
794
        if printRaw {
×
795
                o = resp
×
796
        } else {
×
797
                o = convertDescribeWorkflowExecutionResponse(resp, frontendClient, c)
×
798
        }
×
799

800
        prettyPrintJSONObject(o)
×
801
}
802

803
type AutoResetPointRow struct {
804
        BinaryChecksum string    `header:"Binary Checksum"`
805
        CreateTime     time.Time `header:"Create Time"`
806
        RunID          string    `header:"RunID"`
807
        EventID        int64     `header:"EventID"`
808
}
809

810
func printAutoResetPoints(resp *types.DescribeWorkflowExecutionResponse) {
×
811
        fmt.Println("Auto Reset Points:")
×
812
        table := []AutoResetPointRow{}
×
813
        if resp.WorkflowExecutionInfo.AutoResetPoints == nil || len(resp.WorkflowExecutionInfo.AutoResetPoints.Points) == 0 {
×
814
                return
×
815
        }
×
816
        for _, pt := range resp.WorkflowExecutionInfo.AutoResetPoints.Points {
×
817
                table = append(table, AutoResetPointRow{
×
818
                        BinaryChecksum: pt.GetBinaryChecksum(),
×
819
                        CreateTime:     time.Unix(0, pt.GetCreatedTimeNano()),
×
820
                        RunID:          pt.GetRunID(),
×
821
                        EventID:        pt.GetFirstDecisionCompletedID(),
×
822
                })
×
823
        }
×
824
        RenderTable(os.Stdout, table, RenderOptions{Color: true, Border: true, PrintDateTime: true})
×
825
}
826

827
// describeWorkflowExecutionResponse is used to print datetime instead of print raw time
828
type describeWorkflowExecutionResponse struct {
829
        ExecutionConfiguration *types.WorkflowExecutionConfiguration
830
        WorkflowExecutionInfo  workflowExecutionInfo
831
        PendingActivities      []*pendingActivityInfo
832
        PendingChildren        []*types.PendingChildExecutionInfo
833
        PendingDecision        *pendingDecisionInfo
834
}
835

836
// workflowExecutionInfo has same fields as types.WorkflowExecutionInfo, but has datetime instead of raw time
837
type workflowExecutionInfo struct {
838
        Execution        *types.WorkflowExecution
839
        Type             *types.WorkflowType
840
        StartTime        *string // change from *int64
841
        CloseTime        *string // change from *int64
842
        CloseStatus      *types.WorkflowExecutionCloseStatus
843
        HistoryLength    int64
844
        ParentDomainID   *string
845
        ParentExecution  *types.WorkflowExecution
846
        Memo             *types.Memo
847
        SearchAttributes map[string]interface{}
848
        AutoResetPoints  *types.ResetPoints
849
}
850

851
// pendingActivityInfo has same fields as types.PendingActivityInfo, but different field type for better display
852
type pendingActivityInfo struct {
853
        ActivityID             string
854
        ActivityType           *types.ActivityType
855
        State                  *types.PendingActivityState
856
        ScheduledTimestamp     *string `json:",omitempty"` // change from *int64
857
        LastStartedTimestamp   *string `json:",omitempty"` // change from *int64
858
        HeartbeatDetails       *string `json:",omitempty"` // change from []byte
859
        LastHeartbeatTimestamp *string `json:",omitempty"` // change from *int64
860
        Attempt                int32   `json:",omitempty"`
861
        MaximumAttempts        int32   `json:",omitempty"`
862
        ExpirationTimestamp    *string `json:",omitempty"` // change from *int64
863
        LastFailureReason      *string `json:",omitempty"`
864
        LastWorkerIdentity     string  `json:",omitempty"`
865
        LastFailureDetails     *string `json:",omitempty"` // change from []byte
866
}
867

868
type pendingDecisionInfo struct {
869
        State                      *types.PendingDecisionState
870
        OriginalScheduledTimestamp *string `json:",omitempty"` // change from *int64
871
        ScheduledTimestamp         *string `json:",omitempty"` // change from *int64
872
        StartedTimestamp           *string `json:",omitempty"` // change from *int64
873
        Attempt                    int64   `json:",omitempty"`
874
}
875

876
func convertDescribeWorkflowExecutionResponse(resp *types.DescribeWorkflowExecutionResponse,
877
        wfClient frontend.Client, c *cli.Context) *describeWorkflowExecutionResponse {
4✔
878

4✔
879
        info := resp.WorkflowExecutionInfo
4✔
880
        executionInfo := workflowExecutionInfo{
4✔
881
                Execution:        info.Execution,
4✔
882
                Type:             info.Type,
4✔
883
                StartTime:        common.StringPtr(convertTime(info.GetStartTime(), false)),
4✔
884
                CloseTime:        common.StringPtr(convertTime(info.GetCloseTime(), false)),
4✔
885
                CloseStatus:      info.CloseStatus,
4✔
886
                HistoryLength:    info.HistoryLength,
4✔
887
                ParentDomainID:   info.ParentDomainID,
4✔
888
                ParentExecution:  info.ParentExecution,
4✔
889
                Memo:             info.Memo,
4✔
890
                SearchAttributes: convertSearchAttributesToMapOfInterface(info.SearchAttributes, wfClient, c),
4✔
891
                AutoResetPoints:  info.AutoResetPoints,
4✔
892
        }
4✔
893

4✔
894
        var pendingActs []*pendingActivityInfo
4✔
895
        var tmpAct *pendingActivityInfo
4✔
896
        for _, pa := range resp.PendingActivities {
4✔
897
                tmpAct = &pendingActivityInfo{
×
898
                        ActivityID:             pa.ActivityID,
×
899
                        ActivityType:           pa.ActivityType,
×
900
                        State:                  pa.State,
×
901
                        ScheduledTimestamp:     timestampPtrToStringPtr(pa.ScheduledTimestamp, false),
×
902
                        LastStartedTimestamp:   timestampPtrToStringPtr(pa.LastStartedTimestamp, false),
×
903
                        LastHeartbeatTimestamp: timestampPtrToStringPtr(pa.LastHeartbeatTimestamp, false),
×
904
                        Attempt:                pa.Attempt,
×
905
                        MaximumAttempts:        pa.MaximumAttempts,
×
906
                        ExpirationTimestamp:    timestampPtrToStringPtr(pa.ExpirationTimestamp, false),
×
907
                        LastFailureReason:      pa.LastFailureReason,
×
908
                        LastWorkerIdentity:     pa.LastWorkerIdentity,
×
909
                }
×
910
                if pa.HeartbeatDetails != nil {
×
911
                        tmpAct.HeartbeatDetails = common.StringPtr(string(pa.HeartbeatDetails))
×
912
                }
×
913
                if pa.LastFailureDetails != nil {
×
914
                        tmpAct.LastFailureDetails = common.StringPtr(string(pa.LastFailureDetails))
×
915
                }
×
916
                pendingActs = append(pendingActs, tmpAct)
×
917
        }
918

919
        var pendingDecision *pendingDecisionInfo
4✔
920
        if resp.PendingDecision != nil {
4✔
921
                pendingDecision = &pendingDecisionInfo{
×
922
                        State:              resp.PendingDecision.State,
×
923
                        ScheduledTimestamp: timestampPtrToStringPtr(resp.PendingDecision.ScheduledTimestamp, false),
×
924
                        StartedTimestamp:   timestampPtrToStringPtr(resp.PendingDecision.StartedTimestamp, false),
×
925
                        Attempt:            resp.PendingDecision.Attempt,
×
926
                }
×
927
                // TODO: Idea here is only display decision task original scheduled timestamp if user are
×
928
                // using decision heartbeat. And we should be able to tell whether a decision task has heartbeat
×
929
                // or not by comparing the original scheduled timestamp and scheduled timestamp.
×
930
                // However, currently server may assign different value to original scheduled timestamp and
×
931
                // scheduled time even if there's no decision heartbeat.
×
932
                // if resp.PendingDecision.OriginalScheduledTimestamp != nil &&
×
933
                //         resp.PendingDecision.ScheduledTimestamp != nil &&
×
934
                //         *resp.PendingDecision.OriginalScheduledTimestamp != *resp.PendingDecision.ScheduledTimestamp {
×
935
                //         pendingDecision.OriginalScheduledTimestamp = timestampPtrToStringPtr(resp.PendingDecision.OriginalScheduledTimestamp, false)
×
936
                // }
×
937
        }
×
938

939
        return &describeWorkflowExecutionResponse{
4✔
940
                ExecutionConfiguration: resp.ExecutionConfiguration,
4✔
941
                WorkflowExecutionInfo:  executionInfo,
4✔
942
                PendingActivities:      pendingActs,
4✔
943
                PendingChildren:        resp.PendingChildren,
4✔
944
                PendingDecision:        pendingDecision,
4✔
945
        }
4✔
946
}
947

948
func convertSearchAttributesToMapOfInterface(searchAttributes *types.SearchAttributes,
949
        wfClient frontend.Client, c *cli.Context) map[string]interface{} {
4✔
950

4✔
951
        if searchAttributes == nil || len(searchAttributes.GetIndexedFields()) == 0 {
8✔
952
                return nil
4✔
953
        }
4✔
954

955
        result := make(map[string]interface{})
×
956
        ctx, cancel := newContext(c)
×
957
        defer cancel()
×
958
        validSearchAttributes, err := wfClient.GetSearchAttributes(ctx)
×
959
        if err != nil {
×
960
                ErrorAndExit("Error when get search attributes", err)
×
961
        }
×
962
        validKeys := validSearchAttributes.GetKeys()
×
963

×
964
        indexedFields := searchAttributes.GetIndexedFields()
×
965
        for k, v := range indexedFields {
×
966
                valueType := validKeys[k]
×
967
                deserializedValue, err := common.DeserializeSearchAttributeValue(v, valueType)
×
968
                if err != nil {
×
969
                        ErrorAndExit("Error deserializing search attribute value", err)
×
970
                }
×
971
                result[k] = deserializedValue
×
972
        }
973

974
        return result
×
975
}
976

977
func getAllWorkflowIDsByQuery(c *cli.Context, query string) map[string]bool {
×
978
        wfClient := getWorkflowClient(c)
×
979
        pageSize := 1000
×
980
        var nextPageToken []byte
×
981
        var info []*types.WorkflowExecutionInfo
×
982
        result := map[string]bool{}
×
983
        for {
×
984
                info, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
×
985
                for _, we := range info {
×
986
                        wid := we.Execution.GetWorkflowID()
×
987
                        result[wid] = true
×
988
                }
×
989

990
                if nextPageToken == nil {
×
991
                        break
×
992
                }
993
        }
994
        return result
×
995
}
996

997
func printRunStatus(event *types.HistoryEvent) {
7✔
998
        switch event.GetEventType() {
7✔
999
        case types.EventTypeWorkflowExecutionCompleted:
×
1000
                fmt.Printf("  Status: %s\n", colorGreen("COMPLETED"))
×
1001
                fmt.Printf("  Output: %s\n", string(event.WorkflowExecutionCompletedEventAttributes.Result))
×
1002
        case types.EventTypeWorkflowExecutionFailed:
×
1003
                fmt.Printf("  Status: %s\n", colorRed("FAILED"))
×
1004
                fmt.Printf("  Reason: %s\n", event.WorkflowExecutionFailedEventAttributes.GetReason())
×
1005
                fmt.Printf("  Detail: %s\n", string(event.WorkflowExecutionFailedEventAttributes.Details))
×
1006
        case types.EventTypeWorkflowExecutionTimedOut:
×
1007
                fmt.Printf("  Status: %s\n", colorRed("TIMEOUT"))
×
1008
                fmt.Printf("  Timeout Type: %s\n", event.WorkflowExecutionTimedOutEventAttributes.GetTimeoutType())
×
1009
        case types.EventTypeWorkflowExecutionCanceled:
×
1010
                fmt.Printf("  Status: %s\n", colorRed("CANCELED"))
×
1011
                fmt.Printf("  Detail: %s\n", string(event.WorkflowExecutionCanceledEventAttributes.Details))
×
1012
        }
1013
}
1014

1015
// WorkflowRow is a presentation layer entity use to render a table of workflows
1016
type WorkflowRow struct {
1017
        WorkflowType     string                 `header:"Workflow Type" maxLength:"32"`
1018
        WorkflowID       string                 `header:"Workflow ID"`
1019
        RunID            string                 `header:"Run ID"`
1020
        TaskList         string                 `header:"Task List"`
1021
        IsCron           bool                   `header:"Is Cron"`
1022
        StartTime        time.Time              `header:"Start Time"`
1023
        ExecutionTime    time.Time              `header:"Execution Time"`
1024
        EndTime          time.Time              `header:"End Time"`
1025
        Memo             map[string]string      `header:"Memo"`
1026
        SearchAttributes map[string]interface{} `header:"Search Attributes"`
1027
}
1028

1029
func newWorkflowRow(workflow *types.WorkflowExecutionInfo) WorkflowRow {
4✔
1030
        memo := map[string]string{}
4✔
1031
        for k, v := range workflow.Memo.GetFields() {
4✔
1032
                memo[k] = string(v)
×
1033
        }
×
1034

1035
        sa := map[string]interface{}{}
4✔
1036
        for k, v := range workflow.SearchAttributes.GetIndexedFields() {
4✔
1037
                var decodedVal interface{}
×
1038
                json.Unmarshal(v, &decodedVal)
×
1039
                sa[k] = decodedVal
×
1040
        }
×
1041

1042
        return WorkflowRow{
4✔
1043
                WorkflowType:     workflow.Type.GetName(),
4✔
1044
                WorkflowID:       workflow.Execution.GetWorkflowID(),
4✔
1045
                RunID:            workflow.Execution.GetRunID(),
4✔
1046
                TaskList:         workflow.TaskList,
4✔
1047
                IsCron:           workflow.IsCron,
4✔
1048
                StartTime:        time.Unix(0, workflow.GetStartTime()),
4✔
1049
                ExecutionTime:    time.Unix(0, workflow.GetExecutionTime()),
4✔
1050
                EndTime:          time.Unix(0, workflow.GetCloseTime()),
4✔
1051
                Memo:             memo,
4✔
1052
                SearchAttributes: sa,
4✔
1053
        }
4✔
1054
}
1055

1056
func workflowTableOptions(c *cli.Context) RenderOptions {
9✔
1057
        isScanQueryOpen := isQueryOpen(c.String(FlagListQuery))
9✔
1058

9✔
1059
        return RenderOptions{
9✔
1060
                DefaultTemplate: templateTable,
9✔
1061
                Color:           true,
9✔
1062
                PrintDateTime:   c.Bool(FlagPrintDateTime),
9✔
1063
                PrintRawTime:    c.Bool(FlagPrintRawTime),
9✔
1064
                OptionalColumns: map[string]bool{
9✔
1065
                        "End Time":          !(c.Bool(FlagOpen) || isScanQueryOpen),
9✔
1066
                        "Memo":              c.Bool(FlagPrintMemo),
9✔
1067
                        "Search Attributes": c.Bool(FlagPrintSearchAttr),
9✔
1068
                },
9✔
1069
        }
9✔
1070
}
9✔
1071

1072
type getWorkflowPageFn func([]byte) ([]*types.WorkflowExecutionInfo, []byte)
1073

1074
func getAllWorkflows(getWorkflowPage getWorkflowPageFn) []*types.WorkflowExecutionInfo {
1✔
1075
        var all, page []*types.WorkflowExecutionInfo
1✔
1076
        var nextPageToken []byte
1✔
1077
        for {
2✔
1078
                page, nextPageToken = getWorkflowPage(nextPageToken)
1✔
1079
                all = append(all, page...)
1✔
1080
                if len(nextPageToken) == 0 {
2✔
1081
                        break
1✔
1082
                }
1083
        }
1084
        return all
1✔
1085
}
1086

1087
func filterExcludedWorkflows(c *cli.Context, getWorkflowPage getWorkflowPageFn) getWorkflowPageFn {
×
1088
        excludeWIDs := map[string]bool{}
×
1089
        if c.IsSet(FlagListQuery) && c.IsSet(FlagExcludeWorkflowIDByQuery) {
×
1090
                excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
×
1091
                excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
×
1092
                fmt.Printf("found %d workflowIDs to exclude\n", len(excludeWIDs))
×
1093
        }
×
1094

1095
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
×
1096
                page, nextPageToken := getWorkflowPage(nextPageToken)
×
1097
                filtered := make([]*types.WorkflowExecutionInfo, 0, len(page))
×
1098
                for _, workflow := range page {
×
1099
                        if excludeWIDs[workflow.GetExecution().GetWorkflowID()] {
×
1100
                                continue
×
1101
                        }
1102
                        filtered = append(filtered, workflow)
×
1103
                }
1104
                return filtered, nextPageToken
×
1105
        }
1106
}
1107

1108
func displayPagedWorkflows(c *cli.Context, getWorkflowPage getWorkflowPageFn, firstPageOnly bool) {
8✔
1109
        var page []*types.WorkflowExecutionInfo
8✔
1110
        var nextPageToken []byte
8✔
1111
        for {
16✔
1112
                page, nextPageToken = getWorkflowPage(nextPageToken)
8✔
1113

8✔
1114
                displayWorkflows(c, page)
8✔
1115

8✔
1116
                if firstPageOnly {
16✔
1117
                        break
8✔
1118
                }
1119
                if len(nextPageToken) == 0 {
×
1120
                        break
×
1121
                }
1122
                if !showNextPage() {
×
1123
                        break
×
1124
                }
1125
        }
1126
}
1127

1128
func displayAllWorkflows(c *cli.Context, getWorkflowsPage getWorkflowPageFn) {
1✔
1129
        displayWorkflows(c, getAllWorkflows(getWorkflowsPage))
1✔
1130
}
1✔
1131

1132
func displayWorkflows(c *cli.Context, workflows []*types.WorkflowExecutionInfo) {
9✔
1133
        printJSON := c.Bool(FlagPrintJSON)
9✔
1134
        printDecodedRaw := c.Bool(FlagPrintFullyDetail)
9✔
1135
        if printJSON || printDecodedRaw {
9✔
1136
                fmt.Println("[")
×
1137
                printListResults(workflows, printJSON, false)
×
1138
                fmt.Println("]")
×
1139
        } else {
9✔
1140
                tableOptions := workflowTableOptions(c)
9✔
1141
                var table []WorkflowRow
9✔
1142
                for _, workflow := range workflows {
13✔
1143
                        table = append(table, newWorkflowRow(workflow))
4✔
1144
                }
4✔
1145
                Render(c, table, tableOptions)
9✔
1146
        }
1147
}
1148

1149
func listWorkflowExecutions(client frontend.Client, pageSize int, domain, query string, c *cli.Context) getWorkflowPageFn {
×
1150
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
×
1151
                request := &types.ListWorkflowExecutionsRequest{
×
1152
                        Domain:        domain,
×
1153
                        PageSize:      int32(pageSize),
×
1154
                        NextPageToken: nextPageToken,
×
1155
                        Query:         query,
×
1156
                }
×
1157

×
1158
                ctx, cancel := newContextForLongPoll(c)
×
1159
                defer cancel()
×
1160
                response, err := client.ListWorkflowExecutions(ctx, request)
×
1161
                if err != nil {
×
1162
                        ErrorAndExit("Failed to list workflow.", err)
×
1163
                }
×
1164
                return response.Executions, response.NextPageToken
×
1165
        }
1166
}
1167

1168
func listOpenWorkflow(client frontend.Client, pageSize int, earliestTime, latestTime int64, domain, workflowID, workflowType string, c *cli.Context) getWorkflowPageFn {
7✔
1169
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
14✔
1170
                request := &types.ListOpenWorkflowExecutionsRequest{
7✔
1171
                        Domain:          domain,
7✔
1172
                        MaximumPageSize: int32(pageSize),
7✔
1173
                        NextPageToken:   nextPageToken,
7✔
1174
                        StartTimeFilter: &types.StartTimeFilter{
7✔
1175
                                EarliestTime: common.Int64Ptr(earliestTime),
7✔
1176
                                LatestTime:   common.Int64Ptr(latestTime),
7✔
1177
                        },
7✔
1178
                }
7✔
1179
                if len(workflowID) > 0 {
8✔
1180
                        request.ExecutionFilter = &types.WorkflowExecutionFilter{WorkflowID: workflowID}
1✔
1181
                }
1✔
1182
                if len(workflowType) > 0 {
8✔
1183
                        request.TypeFilter = &types.WorkflowTypeFilter{Name: workflowType}
1✔
1184
                }
1✔
1185

1186
                ctx, cancel := newContextForLongPoll(c)
7✔
1187
                defer cancel()
7✔
1188
                response, err := client.ListOpenWorkflowExecutions(ctx, request)
7✔
1189
                if err != nil {
7✔
1190
                        ErrorAndExit("Failed to list open workflow.", err)
×
1191
                }
×
1192
                return response.Executions, response.NextPageToken
7✔
1193
        }
1194
}
1195

1196
func listClosedWorkflow(client frontend.Client, pageSize int, earliestTime, latestTime int64, domain, workflowID, workflowType string, workflowStatus types.WorkflowExecutionCloseStatus, c *cli.Context) getWorkflowPageFn {
10✔
1197
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
20✔
1198
                request := &types.ListClosedWorkflowExecutionsRequest{
10✔
1199
                        Domain:          domain,
10✔
1200
                        MaximumPageSize: int32(pageSize),
10✔
1201
                        NextPageToken:   nextPageToken,
10✔
1202
                        StartTimeFilter: &types.StartTimeFilter{
10✔
1203
                                EarliestTime: common.Int64Ptr(earliestTime),
10✔
1204
                                LatestTime:   common.Int64Ptr(latestTime),
10✔
1205
                        },
10✔
1206
                }
10✔
1207
                if len(workflowID) > 0 {
11✔
1208
                        request.ExecutionFilter = &types.WorkflowExecutionFilter{WorkflowID: workflowID}
1✔
1209
                }
1✔
1210
                if len(workflowType) > 0 {
11✔
1211
                        request.TypeFilter = &types.WorkflowTypeFilter{Name: workflowType}
1✔
1212
                }
1✔
1213
                if workflowStatus != workflowStatusNotSet {
10✔
1214
                        request.StatusFilter = &workflowStatus
×
1215
                }
×
1216

1217
                ctx, cancel := newContextForLongPoll(c)
10✔
1218
                defer cancel()
10✔
1219
                response, err := client.ListClosedWorkflowExecutions(ctx, request)
10✔
1220
                if err != nil {
10✔
1221
                        ErrorAndExit("Failed to list closed workflow.", err)
×
1222
                }
×
1223
                return response.Executions, response.NextPageToken
10✔
1224
        }
1225
}
1226

1227
func listWorkflows(c *cli.Context) getWorkflowPageFn {
8✔
1228
        wfClient := getWorkflowClient(c)
8✔
1229

8✔
1230
        domain := getRequiredGlobalOption(c, FlagDomain)
8✔
1231
        earliestTime := parseTime(c.String(FlagEarliestTime), 0)
8✔
1232
        latestTime := parseTime(c.String(FlagLatestTime), time.Now().UnixNano())
8✔
1233
        workflowID := c.String(FlagWorkflowID)
8✔
1234
        workflowType := c.String(FlagWorkflowType)
8✔
1235
        queryOpen := c.Bool(FlagOpen)
8✔
1236
        pageSize := c.Int(FlagPageSize)
8✔
1237
        if pageSize <= 0 {
8✔
1238
                pageSize = defaultPageSizeForList
×
1239
        }
×
1240

1241
        var workflowStatus types.WorkflowExecutionCloseStatus
8✔
1242
        if c.IsSet(FlagWorkflowStatus) {
8✔
1243
                if queryOpen {
×
1244
                        ErrorAndExit(optionErr, errors.New("you can only filter on status for closed workflow, not open workflow"))
×
1245
                }
×
1246
                workflowStatus = getWorkflowStatus(c.String(FlagWorkflowStatus))
×
1247
        } else {
8✔
1248
                workflowStatus = workflowStatusNotSet
8✔
1249
        }
8✔
1250

1251
        if len(workflowID) > 0 && len(workflowType) > 0 {
8✔
1252
                ErrorAndExit(optionErr, errors.New("you can filter on workflow_id or workflow_type, but not on both"))
×
1253
        }
×
1254

1255
        if c.IsSet(FlagListQuery) {
8✔
1256
                listQuery := c.String(FlagListQuery)
×
1257
                return listWorkflowExecutions(wfClient, pageSize, domain, listQuery, c)
×
1258
        } else if queryOpen {
11✔
1259
                return listOpenWorkflow(wfClient, pageSize, earliestTime, latestTime, domain, workflowID, workflowType, c)
3✔
1260
        } else {
8✔
1261
                return listClosedWorkflow(wfClient, pageSize, earliestTime, latestTime, domain, workflowID, workflowType, workflowStatus, c)
5✔
1262
        }
5✔
1263
}
1264

1265
func listArchivedWorkflows(c *cli.Context) getWorkflowPageFn {
1✔
1266
        wfClient := getWorkflowClient(c)
1✔
1267

1✔
1268
        domain := getRequiredGlobalOption(c, FlagDomain)
1✔
1269
        pageSize := c.Int(FlagPageSize)
1✔
1270
        listQuery := getRequiredOption(c, FlagListQuery)
1✔
1271
        if pageSize <= 0 {
1✔
1272
                pageSize = defaultPageSizeForList
×
1273
        }
×
1274

1275
        contextTimeout := defaultContextTimeoutForListArchivedWorkflow
1✔
1276
        if c.GlobalIsSet(FlagContextTimeout) {
1✔
1277
                contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second
×
1278
        }
×
1279

1280
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
2✔
1281
                request := &types.ListArchivedWorkflowExecutionsRequest{
1✔
1282
                        Domain:        domain,
1✔
1283
                        PageSize:      int32(pageSize),
1✔
1284
                        Query:         listQuery,
1✔
1285
                        NextPageToken: nextPageToken,
1✔
1286
                }
1✔
1287

1✔
1288
                ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
1✔
1289
                defer cancel()
1✔
1290

1✔
1291
                result, err := wfClient.ListArchivedWorkflowExecutions(ctx, request)
1✔
1292
                if err != nil {
1✔
1293
                        cancel()
×
1294
                        ErrorAndExit("Failed to list archived workflow.", err)
×
1295
                }
×
1296
                return result.Executions, result.NextPageToken
1✔
1297
        }
1298
}
1299

1300
func scanWorkflows(c *cli.Context) getWorkflowPageFn {
×
1301
        wfClient := getWorkflowClient(c)
×
1302
        listQuery := c.String(FlagListQuery)
×
1303
        pageSize := c.Int(FlagPageSize)
×
1304
        if pageSize <= 0 {
×
1305
                pageSize = defaultPageSizeForScan
×
1306
        }
×
1307

1308
        return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
×
1309
                return scanWorkflowExecutions(wfClient, pageSize, nextPageToken, listQuery, c)
×
1310
        }
×
1311
}
1312

1313
func scanWorkflowExecutions(client frontend.Client, pageSize int, nextPageToken []byte, query string, c *cli.Context) ([]*types.WorkflowExecutionInfo, []byte) {
×
1314
        domain := getRequiredGlobalOption(c, FlagDomain)
×
1315

×
1316
        request := &types.ListWorkflowExecutionsRequest{
×
1317
                Domain:        domain,
×
1318
                PageSize:      int32(pageSize),
×
1319
                NextPageToken: nextPageToken,
×
1320
                Query:         query,
×
1321
        }
×
1322
        ctx, cancel := newContextForLongPoll(c)
×
1323
        defer cancel()
×
1324
        response, err := client.ScanWorkflowExecutions(ctx, request)
×
1325
        if err != nil {
×
1326
                ErrorAndExit("Failed to list workflow.", err)
×
1327
        }
×
1328
        return response.Executions, response.NextPageToken
×
1329
}
1330

1331
func getWorkflowStatus(statusStr string) types.WorkflowExecutionCloseStatus {
×
1332
        if status, ok := workflowClosedStatusMap[strings.ToLower(statusStr)]; ok {
×
1333
                return status
×
1334
        }
×
1335
        ErrorAndExit(optionErr, errors.New("option status is not one of allowed values "+
×
1336
                "[completed, failed, canceled, terminated, continued_as_new, timed_out]"))
×
1337
        return 0
×
1338
}
1339

1340
func getWorkflowIDReusePolicy(value int) *types.WorkflowIDReusePolicy {
3✔
1341
        if value >= 0 && types.WorkflowIDReusePolicy(value) <= types.WorkflowIDReusePolicyTerminateIfRunning {
4✔
1342
                return types.WorkflowIDReusePolicy(value).Ptr()
1✔
1343
        }
1✔
1344
        // At this point, the policy should return if the value is valid
1345
        ErrorAndExit(fmt.Sprintf("Option %v value is not in supported range.", FlagWorkflowIDReusePolicy), nil)
2✔
1346
        return nil
2✔
1347
}
1348

1349
// default will print decoded raw
1350
func printListResults(executions []*types.WorkflowExecutionInfo, inJSON bool, more bool) {
×
1351
        for i, execution := range executions {
×
1352
                if inJSON {
×
1353
                        j, _ := json.Marshal(execution)
×
1354
                        if more || i < len(executions)-1 {
×
1355
                                fmt.Println(string(j) + ",")
×
1356
                        } else {
×
1357
                                fmt.Println(string(j))
×
1358
                        }
×
1359
                } else {
×
1360
                        if more || i < len(executions)-1 {
×
1361
                                fmt.Println(anyToString(execution, true, 0) + ",")
×
1362
                        } else {
×
1363
                                fmt.Println(anyToString(execution, true, 0))
×
1364
                        }
×
1365
                }
1366
        }
1367
}
1368

1369
// ObserveHistory show the process of running workflow
1370
func ObserveHistory(c *cli.Context) {
2✔
1371
        wid := getRequiredOption(c, FlagWorkflowID)
2✔
1372
        rid := c.String(FlagRunID)
2✔
1373
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
1374

2✔
1375
        printWorkflowProgress(c, domain, wid, rid)
2✔
1376
}
2✔
1377

1378
// ResetWorkflow reset workflow
1379
func ResetWorkflow(c *cli.Context) {
×
1380
        domain := getRequiredGlobalOption(c, FlagDomain)
×
1381
        wid := getRequiredOption(c, FlagWorkflowID)
×
1382
        reason := getRequiredOption(c, FlagReason)
×
1383
        if len(reason) == 0 {
×
1384
                ErrorAndExit("wrong reason", fmt.Errorf("reason cannot be empty"))
×
1385
        }
×
1386
        eventID := c.Int64(FlagEventID)
×
1387
        resetType := c.String(FlagResetType)
×
1388
        decisionOffset := c.Int(FlagDecisionOffset)
×
1389
        if decisionOffset > 0 {
×
1390
                ErrorAndExit("Only decision offset <=0 is supported", nil)
×
1391
        }
×
1392

1393
        extraForResetType, ok := resetTypesMap[resetType]
×
1394
        if !ok && eventID <= 0 {
×
1395
                ErrorAndExit("Must specify valid eventID or valid resetType", nil)
×
1396
        }
×
1397
        if ok && len(extraForResetType) > 0 {
×
1398
                getRequiredOption(c, extraForResetType)
×
1399
        }
×
1400

1401
        ctx, cancel := newContext(c)
×
1402
        defer cancel()
×
1403

×
1404
        frontendClient := cFactory.ServerFrontendClient(c)
×
1405
        rid := c.String(FlagRunID)
×
1406
        var err error
×
1407
        if rid == "" {
×
1408
                rid, err = getCurrentRunID(ctx, domain, wid, frontendClient)
×
1409
                if err != nil {
×
1410
                        ErrorAndExit("Cannot get latest RunID as default", err)
×
1411
                }
×
1412
        }
1413

1414
        resetBaseRunID := rid
×
1415
        decisionFinishID := eventID
×
1416
        if resetType != "" {
×
1417
                resetBaseRunID, decisionFinishID, err = getResetEventIDByType(ctx, c, resetType, decisionOffset, domain, wid, rid, frontendClient)
×
1418
                if err != nil {
×
1419
                        ErrorAndExit("getResetEventIDByType failed", err)
×
1420
                }
×
1421
        }
1422
        resp, err := frontendClient.ResetWorkflowExecution(ctx, &types.ResetWorkflowExecutionRequest{
×
1423
                Domain: domain,
×
1424
                WorkflowExecution: &types.WorkflowExecution{
×
1425
                        WorkflowID: wid,
×
1426
                        RunID:      resetBaseRunID,
×
1427
                },
×
1428
                Reason:                fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), reason),
×
1429
                DecisionFinishEventID: decisionFinishID,
×
1430
                RequestID:             uuid.New(),
×
1431
                SkipSignalReapply:     c.Bool(FlagSkipSignalReapply),
×
1432
        })
×
1433
        if err != nil {
×
1434
                ErrorAndExit("reset failed", err)
×
1435
        }
×
1436
        prettyPrintJSONObject(resp)
×
1437
}
1438

1439
func processResets(c *cli.Context, domain string, wes chan types.WorkflowExecution, done chan bool, wg *sync.WaitGroup, params batchResetParamsType) {
×
1440
        for {
×
1441
                select {
×
1442
                case we := <-wes:
×
1443
                        fmt.Println("received: ", we.GetWorkflowID(), we.GetRunID())
×
1444
                        wid := we.GetWorkflowID()
×
1445
                        rid := we.GetRunID()
×
1446
                        var err error
×
1447
                        for i := 0; i < 3; i++ {
×
1448
                                err = doReset(c, domain, wid, rid, params)
×
1449
                                if err == nil {
×
1450
                                        break
×
1451
                                }
1452
                                if _, ok := err.(*types.BadRequestError); ok {
×
1453
                                        break
×
1454
                                }
1455
                                fmt.Println("failed and retry...: ", wid, rid, err)
×
1456
                                time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
×
1457
                        }
1458
                        time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
×
1459
                        if err != nil {
×
1460
                                fmt.Println("[ERROR] failed processing: ", wid, rid, err.Error())
×
1461
                        }
×
1462
                case <-done:
×
1463
                        wg.Done()
×
1464
                        return
×
1465
                }
1466
        }
1467
}
1468

1469
type batchResetParamsType struct {
1470
        reason               string
1471
        skipCurrentOpen      bool
1472
        skipCurrentCompleted bool
1473
        nonDeterministicOnly bool
1474
        skipBaseNotCurrent   bool
1475
        dryRun               bool
1476
        resetType            string
1477
        decisionOffset       int
1478
        skipSignalReapply    bool
1479
}
1480

1481
// ResetInBatch resets workflow in batch
1482
func ResetInBatch(c *cli.Context) {
×
1483
        domain := getRequiredGlobalOption(c, FlagDomain)
×
1484
        resetType := getRequiredOption(c, FlagResetType)
×
1485
        decisionOffset := c.Int(FlagDecisionOffset)
×
1486
        if decisionOffset > 0 {
×
1487
                ErrorAndExit("Only decision offset <=0 is supported", nil)
×
1488
        }
×
1489

1490
        inFileName := c.String(FlagInputFile)
×
1491
        query := c.String(FlagListQuery)
×
1492
        excludeFileName := c.String(FlagExcludeFile)
×
1493
        excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
×
1494
        separator := c.String(FlagInputSeparator)
×
1495
        parallel := c.Int(FlagParallism)
×
1496

×
1497
        extraForResetType, ok := resetTypesMap[resetType]
×
1498
        if !ok {
×
1499
                ErrorAndExit("Not supported reset type", nil)
×
1500
        } else if len(extraForResetType) > 0 {
×
1501
                getRequiredOption(c, extraForResetType)
×
1502
        }
×
1503

1504
        if excludeFileName != "" && excludeQuery != "" {
×
1505
                ErrorAndExit("Only one of the excluding option is allowed", nil)
×
1506
        }
×
1507

1508
        batchResetParams := batchResetParamsType{
×
1509
                reason:               getRequiredOption(c, FlagReason),
×
1510
                skipCurrentOpen:      c.Bool(FlagSkipCurrentOpen),
×
1511
                skipCurrentCompleted: c.Bool(FlagSkipCurrentCompleted),
×
1512
                nonDeterministicOnly: c.Bool(FlagNonDeterministicOnly),
×
1513
                skipBaseNotCurrent:   c.Bool(FlagSkipBaseIsNotCurrent),
×
1514
                dryRun:               c.Bool(FlagDryRun),
×
1515
                resetType:            resetType,
×
1516
                decisionOffset:       decisionOffset,
×
1517
                skipSignalReapply:    c.Bool(FlagSkipSignalReapply),
×
1518
        }
×
1519

×
1520
        if inFileName == "" && query == "" {
×
1521
                ErrorAndExit("Must provide input file or list query to get target workflows to reset", nil)
×
1522
        }
×
1523

1524
        wg := &sync.WaitGroup{}
×
1525

×
1526
        wes := make(chan types.WorkflowExecution)
×
1527
        done := make(chan bool)
×
1528
        for i := 0; i < parallel; i++ {
×
1529
                wg.Add(1)
×
1530
                go processResets(c, domain, wes, done, wg, batchResetParams)
×
1531
        }
×
1532

1533
        // read excluded workflowIDs
1534
        excludeWIDs := map[string]bool{}
×
1535
        if excludeFileName != "" {
×
1536
                excludeWIDs = loadWorkflowIDsFromFile(excludeFileName, separator)
×
1537
        }
×
1538
        if excludeQuery != "" {
×
1539
                excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
×
1540
        }
×
1541

1542
        fmt.Println("num of excluded WorkflowIDs:", len(excludeWIDs))
×
1543

×
1544
        if len(inFileName) > 0 {
×
1545
                inFile, err := os.Open(inFileName)
×
1546
                if err != nil {
×
1547
                        ErrorAndExit("Open failed", err)
×
1548
                }
×
1549
                defer inFile.Close()
×
1550
                scanner := bufio.NewScanner(inFile)
×
1551
                idx := 0
×
1552
                for scanner.Scan() {
×
1553
                        idx++
×
1554
                        line := strings.TrimSpace(scanner.Text())
×
1555
                        if len(line) == 0 {
×
1556
                                fmt.Printf("line %v is empty, skipped\n", idx)
×
1557
                                continue
×
1558
                        }
1559
                        cols := strings.Split(line, separator)
×
1560
                        if len(cols) < 1 {
×
1561
                                ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
×
1562
                        }
×
1563
                        fmt.Printf("Start processing line %v ...\n", idx)
×
1564
                        wid := strings.TrimSpace(cols[0])
×
1565
                        rid := ""
×
1566
                        if len(cols) > 1 {
×
1567
                                rid = strings.TrimSpace(cols[1])
×
1568
                        }
×
1569

1570
                        if excludeWIDs[wid] {
×
1571
                                fmt.Println("skip by exclude file: ", wid, rid)
×
1572
                                continue
×
1573
                        }
1574

1575
                        wes <- types.WorkflowExecution{
×
1576
                                WorkflowID: wid,
×
1577
                                RunID:      rid,
×
1578
                        }
×
1579
                }
1580
        } else {
×
1581
                wfClient := getWorkflowClient(c)
×
1582
                pageSize := 1000
×
1583
                var nextPageToken []byte
×
1584
                var result []*types.WorkflowExecutionInfo
×
1585
                for {
×
1586
                        result, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
×
1587
                        for _, we := range result {
×
1588
                                wid := we.Execution.GetWorkflowID()
×
1589
                                rid := we.Execution.GetRunID()
×
1590
                                if excludeWIDs[wid] {
×
1591
                                        fmt.Println("skip by exclude file: ", wid, rid)
×
1592
                                        continue
×
1593
                                }
1594

1595
                                wes <- types.WorkflowExecution{
×
1596
                                        WorkflowID: wid,
×
1597
                                        RunID:      rid,
×
1598
                                }
×
1599
                        }
1600

1601
                        if nextPageToken == nil {
×
1602
                                break
×
1603
                        }
1604
                }
1605
        }
1606

1607
        close(done)
×
1608
        fmt.Println("wait for all goroutines...")
×
1609
        wg.Wait()
×
1610
}
1611

1612
func loadWorkflowIDsFromFile(excludeFileName, separator string) map[string]bool {
×
1613
        excludeWIDs := map[string]bool{}
×
1614
        if len(excludeFileName) > 0 {
×
1615
                // This code is only used in the CLI. The input provided is from a trusted user.
×
1616
                // #nosec
×
1617
                excFile, err := os.Open(excludeFileName)
×
1618
                if err != nil {
×
1619
                        ErrorAndExit("Open failed2", err)
×
1620
                }
×
1621
                defer excFile.Close()
×
1622
                scanner := bufio.NewScanner(excFile)
×
1623
                idx := 0
×
1624
                for scanner.Scan() {
×
1625
                        idx++
×
1626
                        line := strings.TrimSpace(scanner.Text())
×
1627
                        if len(line) == 0 {
×
1628
                                fmt.Printf("line %v is empty, skipped\n", idx)
×
1629
                                continue
×
1630
                        }
1631
                        cols := strings.Split(line, separator)
×
1632
                        if len(cols) < 1 {
×
1633
                                ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
×
1634
                        }
×
1635
                        wid := strings.TrimSpace(cols[0])
×
1636
                        excludeWIDs[wid] = true
×
1637
                }
1638
        }
1639
        return excludeWIDs
×
1640
}
1641

1642
func printErrorAndReturn(msg string, err error) error {
×
1643
        fmt.Println(msg)
×
1644
        return err
×
1645
}
×
1646

1647
func doReset(c *cli.Context, domain, wid, rid string, params batchResetParamsType) error {
×
1648
        ctx, cancel := newContext(c)
×
1649
        defer cancel()
×
1650

×
1651
        frontendClient := cFactory.ServerFrontendClient(c)
×
1652
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
1653
                Domain: domain,
×
1654
                Execution: &types.WorkflowExecution{
×
1655
                        WorkflowID: wid,
×
1656
                },
×
1657
        })
×
1658
        if err != nil {
×
1659
                return printErrorAndReturn("DescribeWorkflowExecution failed", err)
×
1660
        }
×
1661

1662
        currentRunID := resp.WorkflowExecutionInfo.Execution.GetRunID()
×
1663
        if currentRunID != rid && params.skipBaseNotCurrent {
×
1664
                fmt.Println("skip because base run is different from current run: ", wid, rid, currentRunID)
×
1665
                return nil
×
1666
        }
×
1667
        if rid == "" {
×
1668
                rid = currentRunID
×
1669
        }
×
1670

1671
        if resp.WorkflowExecutionInfo.CloseStatus == nil || resp.WorkflowExecutionInfo.CloseTime == nil {
×
1672
                if params.skipCurrentOpen {
×
1673
                        fmt.Println("skip because current run is open: ", wid, rid, currentRunID)
×
1674
                        return nil
×
1675
                }
×
1676
        }
1677

1678
        if resp.WorkflowExecutionInfo.GetCloseStatus() == types.WorkflowExecutionCloseStatusCompleted {
×
1679
                if params.skipCurrentCompleted {
×
1680
                        fmt.Println("skip because current run is completed: ", wid, rid, currentRunID)
×
1681
                        return nil
×
1682
                }
×
1683
        }
1684

1685
        if params.nonDeterministicOnly {
×
1686
                isLDN, err := isLastEventDecisionTaskFailedWithNonDeterminism(ctx, domain, wid, rid, frontendClient)
×
1687
                if err != nil {
×
1688
                        return printErrorAndReturn("check isLastEventDecisionTaskFailedWithNonDeterminism failed", err)
×
1689
                }
×
1690
                if !isLDN {
×
1691
                        fmt.Println("skip because last event is not DecisionTaskFailedWithNonDeterminism")
×
1692
                        return nil
×
1693
                }
×
1694
        }
1695

1696
        resetBaseRunID, decisionFinishID, err := getResetEventIDByType(ctx, c, params.resetType, params.decisionOffset, domain, wid, rid, frontendClient)
×
1697
        if err != nil {
×
1698
                return printErrorAndReturn("getResetEventIDByType failed", err)
×
1699
        }
×
1700
        fmt.Println("DecisionFinishEventId for reset:", wid, rid, resetBaseRunID, decisionFinishID)
×
1701

×
1702
        if params.dryRun {
×
1703
                fmt.Printf("dry run to reset wid: %v, rid:%v to baseRunID:%v, eventID:%v \n", wid, rid, resetBaseRunID, decisionFinishID)
×
1704
        } else {
×
1705
                resp2, err := frontendClient.ResetWorkflowExecution(ctx, &types.ResetWorkflowExecutionRequest{
×
1706
                        Domain: domain,
×
1707
                        WorkflowExecution: &types.WorkflowExecution{
×
1708
                                WorkflowID: wid,
×
1709
                                RunID:      resetBaseRunID,
×
1710
                        },
×
1711
                        DecisionFinishEventID: decisionFinishID,
×
1712
                        RequestID:             uuid.New(),
×
1713
                        Reason:                fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), params.reason),
×
1714
                        SkipSignalReapply:     params.skipSignalReapply,
×
1715
                })
×
1716

×
1717
                if err != nil {
×
1718
                        return printErrorAndReturn("ResetWorkflowExecution failed", err)
×
1719
                }
×
1720
                fmt.Println("new runID for wid/rid is ,", wid, rid, resp2.GetRunID())
×
1721
        }
1722

1723
        return nil
×
1724
}
1725

1726
func isLastEventDecisionTaskFailedWithNonDeterminism(ctx context.Context, domain, wid, rid string, frontendClient frontend.Client) (bool, error) {
×
1727
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1728
                Domain: domain,
×
1729
                Execution: &types.WorkflowExecution{
×
1730
                        WorkflowID: wid,
×
1731
                        RunID:      rid,
×
1732
                },
×
1733
                MaximumPageSize: 1000,
×
1734
                NextPageToken:   nil,
×
1735
        }
×
1736

×
1737
        var firstEvent, decisionFailed *types.HistoryEvent
×
1738
        for {
×
1739
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1740
                if err != nil {
×
1741
                        return false, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1742
                }
×
1743
                for _, e := range resp.GetHistory().GetEvents() {
×
1744
                        if firstEvent == nil {
×
1745
                                firstEvent = e
×
1746
                        }
×
1747
                        if e.GetEventType() == types.EventTypeDecisionTaskFailed {
×
1748
                                decisionFailed = e
×
1749
                        } else if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
×
1750
                                decisionFailed = nil
×
1751
                        }
×
1752
                }
1753
                if len(resp.NextPageToken) != 0 {
×
1754
                        req.NextPageToken = resp.NextPageToken
×
1755
                } else {
×
1756
                        break
×
1757
                }
1758
        }
1759

1760
        if decisionFailed != nil {
×
1761
                attr := decisionFailed.GetDecisionTaskFailedEventAttributes()
×
1762
                if attr.GetCause() == types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure ||
×
1763
                        strings.Contains(string(attr.GetDetails()), "nondeterministic") {
×
1764
                        fmt.Printf("found non-deterministic workflow wid:%v, rid:%v, originalStartTime:%v \n", wid, rid, time.Unix(0, firstEvent.GetTimestamp()))
×
1765
                        return true, nil
×
1766
                }
×
1767
        }
1768

1769
        return false, nil
×
1770
}
1771

1772
func getResetEventIDByType(
1773
        ctx context.Context,
1774
        c *cli.Context,
1775
        resetType string, decisionOffset int,
1776
        domain, wid, rid string,
1777
        frontendClient frontend.Client,
1778
) (resetBaseRunID string, decisionFinishID int64, err error) {
×
1779
        // default to the same runID
×
1780
        resetBaseRunID = rid
×
1781

×
1782
        fmt.Println("resetType:", resetType)
×
1783
        switch resetType {
×
1784
        case resetTypeLastDecisionCompleted:
×
1785
                decisionFinishID, err = getLastDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskCompleted, decisionOffset)
×
1786
                if err != nil {
×
1787
                        return
×
1788
                }
×
1789
        case resetTypeLastContinuedAsNew:
×
1790
                // this reset type may change the base runID
×
1791
                resetBaseRunID, decisionFinishID, err = getLastContinueAsNewID(ctx, domain, wid, rid, frontendClient)
×
1792
                if err != nil {
×
1793
                        return
×
1794
                }
×
1795
        case resetTypeFirstDecisionCompleted:
×
1796
                decisionFinishID, err = getFirstDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskCompleted)
×
1797
                if err != nil {
×
1798
                        return
×
1799
                }
×
1800
        case resetTypeBadBinary:
×
1801
                binCheckSum := c.String(FlagResetBadBinaryChecksum)
×
1802
                decisionFinishID, err = getBadDecisionCompletedID(ctx, domain, wid, rid, binCheckSum, frontendClient)
×
1803
                if err != nil {
×
1804
                        return
×
1805
                }
×
1806
        case resetTypeDecisionCompletedTime:
×
1807
                earliestTime := parseTime(c.String(FlagEarliestTime), 0)
×
1808
                decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTime, frontendClient)
×
1809
                if err != nil {
×
1810
                        return
×
1811
                }
×
1812
        case resetTypeFirstDecisionScheduled:
×
1813
                decisionFinishID, err = getFirstDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskScheduled)
×
1814
                if err != nil {
×
1815
                        return
×
1816
                }
×
1817
                // decisionFinishID is exclusive in reset API
1818
                decisionFinishID++
×
1819
        case resetTypeLastDecisionScheduled:
×
1820
                decisionFinishID, err = getLastDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskScheduled, decisionOffset)
×
1821
                if err != nil {
×
1822
                        return
×
1823
                }
×
1824
                // decisionFinishID is exclusive in reset API
1825
                decisionFinishID++
×
1826
        default:
×
1827
                panic("not supported resetType")
×
1828
        }
1829
        return
×
1830
}
1831

1832
func getFirstDecisionTaskByType(
1833
        ctx context.Context,
1834
        domain string,
1835
        workflowID string,
1836
        runID string,
1837
        frontendClient frontend.Client,
1838
        decisionType types.EventType,
1839
) (decisionFinishID int64, err error) {
×
1840

×
1841
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1842
                Domain: domain,
×
1843
                Execution: &types.WorkflowExecution{
×
1844
                        WorkflowID: workflowID,
×
1845
                        RunID:      runID,
×
1846
                },
×
1847
                MaximumPageSize: 1000,
×
1848
                NextPageToken:   nil,
×
1849
        }
×
1850

×
1851
        for {
×
1852
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1853
                if err != nil {
×
1854
                        return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1855
                }
×
1856

1857
                for _, e := range resp.GetHistory().GetEvents() {
×
1858
                        if e.GetEventType() == decisionType {
×
1859
                                decisionFinishID = e.ID
×
1860
                                return decisionFinishID, nil
×
1861
                        }
×
1862
                }
1863

1864
                if len(resp.NextPageToken) != 0 {
×
1865
                        req.NextPageToken = resp.NextPageToken
×
1866
                } else {
×
1867
                        break
×
1868
                }
1869
        }
1870
        if decisionFinishID == 0 {
×
1871
                return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
1872
        }
×
1873
        return
×
1874
}
1875

1876
func getCurrentRunID(ctx context.Context, domain, wid string, frontendClient frontend.Client) (string, error) {
×
1877
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
1878
                Domain: domain,
×
1879
                Execution: &types.WorkflowExecution{
×
1880
                        WorkflowID: wid,
×
1881
                },
×
1882
        })
×
1883
        if err != nil {
×
1884
                return "", err
×
1885
        }
×
1886
        return resp.WorkflowExecutionInfo.Execution.GetRunID(), nil
×
1887
}
1888

1889
func getBadDecisionCompletedID(ctx context.Context, domain, wid, rid, binChecksum string, frontendClient frontend.Client) (decisionFinishID int64, err error) {
×
1890
        resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
×
1891
                Domain: domain,
×
1892
                Execution: &types.WorkflowExecution{
×
1893
                        WorkflowID: wid,
×
1894
                        RunID:      rid,
×
1895
                },
×
1896
        })
×
1897
        if err != nil {
×
1898
                return 0, printErrorAndReturn("DescribeWorkflowExecution failed", err)
×
1899
        }
×
1900

1901
        _, p := execution.FindAutoResetPoint(clock.NewRealTimeSource(), &types.BadBinaries{
×
1902
                Binaries: map[string]*types.BadBinaryInfo{
×
1903
                        binChecksum: {},
×
1904
                },
×
1905
        }, resp.WorkflowExecutionInfo.AutoResetPoints)
×
1906
        if p != nil {
×
1907
                decisionFinishID = p.GetFirstDecisionCompletedID()
×
1908
        }
×
1909

1910
        if decisionFinishID == 0 {
×
1911
                return 0, printErrorAndReturn("Get DecisionFinishID failed", &types.BadRequestError{Message: "no DecisionFinishID"})
×
1912
        }
×
1913
        return
×
1914
}
1915

1916
func getLastDecisionTaskByType(
1917
        ctx context.Context,
1918
        domain string,
1919
        workflowID string,
1920
        runID string,
1921
        frontendClient frontend.Client,
1922
        decisionType types.EventType,
1923
        decisionOffset int,
1924
) (int64, error) {
×
1925

×
1926
        // this fixedSizeQueue is for remembering the offset decision eventID
×
1927
        fixedSizeQueue := make([]int64, 0)
×
1928
        size := int(math.Abs(float64(decisionOffset))) + 1
×
1929

×
1930
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1931
                Domain: domain,
×
1932
                Execution: &types.WorkflowExecution{
×
1933
                        WorkflowID: workflowID,
×
1934
                        RunID:      runID,
×
1935
                },
×
1936
                MaximumPageSize: 1000,
×
1937
                NextPageToken:   nil,
×
1938
        }
×
1939

×
1940
        for {
×
1941
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1942
                if err != nil {
×
1943
                        return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1944
                }
×
1945

1946
                for _, e := range resp.GetHistory().GetEvents() {
×
1947
                        if e.GetEventType() == decisionType {
×
1948
                                decisionEventID := e.ID
×
1949
                                fixedSizeQueue = append(fixedSizeQueue, decisionEventID)
×
1950
                                if len(fixedSizeQueue) > size {
×
1951
                                        fixedSizeQueue = fixedSizeQueue[1:]
×
1952
                                }
×
1953
                        }
1954
                }
1955

1956
                if len(resp.NextPageToken) != 0 {
×
1957
                        req.NextPageToken = resp.NextPageToken
×
1958
                } else {
×
1959
                        break
×
1960
                }
1961
        }
1962
        if len(fixedSizeQueue) == 0 {
×
1963
                return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
1964
        }
×
1965
        return fixedSizeQueue[0], nil
×
1966
}
1967

1968
func getLastContinueAsNewID(ctx context.Context, domain, wid, rid string, frontendClient frontend.Client) (resetBaseRunID string, decisionFinishID int64, err error) {
×
1969
        // get first event
×
1970
        req := &types.GetWorkflowExecutionHistoryRequest{
×
1971
                Domain: domain,
×
1972
                Execution: &types.WorkflowExecution{
×
1973
                        WorkflowID: wid,
×
1974
                        RunID:      rid,
×
1975
                },
×
1976
                MaximumPageSize: 1,
×
1977
                NextPageToken:   nil,
×
1978
        }
×
1979
        resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
1980
        if err != nil {
×
1981
                return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
1982
        }
×
1983
        firstEvent := resp.History.Events[0]
×
1984
        resetBaseRunID = firstEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunID()
×
1985
        if resetBaseRunID == "" {
×
1986
                return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", fmt.Errorf("cannot get resetBaseRunID"))
×
1987
        }
×
1988

1989
        req = &types.GetWorkflowExecutionHistoryRequest{
×
1990
                Domain: domain,
×
1991
                Execution: &types.WorkflowExecution{
×
1992
                        WorkflowID: wid,
×
1993
                        RunID:      resetBaseRunID,
×
1994
                },
×
1995
                MaximumPageSize: 1000,
×
1996
                NextPageToken:   nil,
×
1997
        }
×
1998
        for {
×
1999
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
2000
                if err != nil {
×
2001
                        return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
2002
                }
×
2003
                for _, e := range resp.GetHistory().GetEvents() {
×
2004
                        if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
×
2005
                                decisionFinishID = e.ID
×
2006
                        }
×
2007
                }
2008
                if len(resp.NextPageToken) != 0 {
×
2009
                        req.NextPageToken = resp.NextPageToken
×
2010
                } else {
×
2011
                        break
×
2012
                }
2013
        }
2014
        if decisionFinishID == 0 {
×
2015
                return "", 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
2016
        }
×
2017
        return
×
2018
}
2019

2020
// CompleteActivity completes an activity
2021
func CompleteActivity(c *cli.Context) {
×
2022
        domain := getRequiredGlobalOption(c, FlagDomain)
×
2023
        wid := getRequiredOption(c, FlagWorkflowID)
×
2024
        rid := getRequiredOption(c, FlagRunID)
×
2025
        activityID := getRequiredOption(c, FlagActivityID)
×
2026
        if len(activityID) == 0 {
×
2027
                ErrorAndExit("Invalid activityID", fmt.Errorf("activityID cannot be empty"))
×
2028
        }
×
2029
        result := getRequiredOption(c, FlagResult)
×
2030
        identity := getRequiredOption(c, FlagIdentity)
×
2031
        ctx, cancel := newContext(c)
×
2032
        defer cancel()
×
2033

×
2034
        frontendClient := cFactory.ServerFrontendClient(c)
×
2035
        err := frontendClient.RespondActivityTaskCompletedByID(ctx, &types.RespondActivityTaskCompletedByIDRequest{
×
2036
                Domain:     domain,
×
2037
                WorkflowID: wid,
×
2038
                RunID:      rid,
×
2039
                ActivityID: activityID,
×
2040
                Result:     []byte(result),
×
2041
                Identity:   identity,
×
2042
        })
×
2043
        if err != nil {
×
2044
                ErrorAndExit("Completing activity failed", err)
×
2045
        } else {
×
2046
                fmt.Println("Complete activity successfully.")
×
2047
        }
×
2048
}
2049

2050
// FailActivity fails an activity
2051
func FailActivity(c *cli.Context) {
×
2052
        domain := getRequiredGlobalOption(c, FlagDomain)
×
2053
        wid := getRequiredOption(c, FlagWorkflowID)
×
2054
        rid := getRequiredOption(c, FlagRunID)
×
2055
        activityID := getRequiredOption(c, FlagActivityID)
×
2056
        if len(activityID) == 0 {
×
2057
                ErrorAndExit("Invalid activityID", fmt.Errorf("activityID cannot be empty"))
×
2058
        }
×
2059
        reason := getRequiredOption(c, FlagReason)
×
2060
        detail := getRequiredOption(c, FlagDetail)
×
2061
        identity := getRequiredOption(c, FlagIdentity)
×
2062
        ctx, cancel := newContext(c)
×
2063
        defer cancel()
×
2064

×
2065
        frontendClient := cFactory.ServerFrontendClient(c)
×
2066
        err := frontendClient.RespondActivityTaskFailedByID(ctx, &types.RespondActivityTaskFailedByIDRequest{
×
2067
                Domain:     domain,
×
2068
                WorkflowID: wid,
×
2069
                RunID:      rid,
×
2070
                ActivityID: activityID,
×
2071
                Reason:     common.StringPtr(reason),
×
2072
                Details:    []byte(detail),
×
2073
                Identity:   identity,
×
2074
        })
×
2075
        if err != nil {
×
2076
                ErrorAndExit("Failing activity failed", err)
×
2077
        } else {
×
2078
                fmt.Println("Fail activity successfully.")
×
2079
        }
×
2080
}
2081

2082
// ObserveHistoryWithID show the process of running workflow
2083
func ObserveHistoryWithID(c *cli.Context) {
2✔
2084
        domain := getRequiredGlobalOption(c, FlagDomain)
2✔
2085
        if !c.Args().Present() {
2✔
2086
                ErrorAndExit("Argument workflow_id is required.", nil)
×
2087
        }
×
2088
        wid := c.Args().First()
2✔
2089
        rid := ""
2✔
2090
        if c.NArg() >= 2 {
2✔
2091
                rid = c.Args().Get(1)
×
2092
        }
×
2093

2094
        printWorkflowProgress(c, domain, wid, rid)
2✔
2095
}
2096

2097
func getEarliestDecisionID(
2098
        ctx context.Context,
2099
        domain string, wid string,
2100
        rid string, earliestTime int64,
2101
        frontendClient frontend.Client,
2102
) (decisionFinishID int64, err error) {
×
2103
        req := &types.GetWorkflowExecutionHistoryRequest{
×
2104
                Domain: domain,
×
2105
                Execution: &types.WorkflowExecution{
×
2106
                        WorkflowID: wid,
×
2107
                        RunID:      rid,
×
2108
                },
×
2109
                MaximumPageSize: 1000,
×
2110
                NextPageToken:   nil,
×
2111
        }
×
2112

×
2113
OuterLoop:
×
2114
        for {
×
2115
                resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
×
2116
                if err != nil {
×
2117
                        return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
×
2118
                }
×
2119
                for _, e := range resp.GetHistory().GetEvents() {
×
2120
                        if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
×
2121
                                if e.GetTimestamp() >= earliestTime {
×
2122
                                        decisionFinishID = e.ID
×
2123
                                        break OuterLoop
×
2124
                                }
2125
                        }
2126
                }
2127
                if len(resp.NextPageToken) != 0 {
×
2128
                        req.NextPageToken = resp.NextPageToken
×
2129
                } else {
×
2130
                        break
×
2131
                }
2132
        }
2133
        if decisionFinishID == 0 {
×
2134
                return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
×
2135
        }
×
2136
        return
×
2137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc