• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f9f5c-2963-46a5-8e77-e52888c66415

22 May 2024 08:12AM UTC coverage: 69.202% (-0.03%) from 69.232%
018f9f5c-2963-46a5-8e77-e52888c66415

push

buildkite

web-flow
Add method to list all workflow executions with support for partial match and search params (#6017)

* try

* update

* Update dataVisibilityManagerInterfaces.go

* updated

* Update pinot_visibility_store_test.go

* Update visibility_store_mock.go

* Update es_visibility_store_test.go

* Update pinot_visibility_store.go

77 of 89 new or added lines in 4 files covered. (86.52%)

80 existing lines in 21 files now uncovered.

101937 of 147303 relevant lines covered (69.2%)

2547.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.17
/common/persistence/sql/sql_visibility_store.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "encoding/json"
27
        "fmt"
28
        "time"
29

30
        workflow "github.com/uber/cadence/.gen/go/shared"
31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/config"
33
        "github.com/uber/cadence/common/log"
34
        p "github.com/uber/cadence/common/persistence"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
        "github.com/uber/cadence/common/types/mapper/thrift"
38
)
39

40
type (
41
        sqlVisibilityStore struct {
42
                sqlStore
43
        }
44

45
        visibilityPageToken struct {
46
                Time  time.Time
47
                RunID string
48
        }
49
)
50

51
// NewSQLVisibilityStore creates an instance of ExecutionStore
52
func NewSQLVisibilityStore(cfg config.SQL, logger log.Logger) (p.VisibilityStore, error) {
28✔
53
        db, err := NewSQLDB(&cfg)
28✔
54
        if err != nil {
29✔
55
                return nil, err
1✔
56
        }
1✔
57
        return &sqlVisibilityStore{
27✔
58
                sqlStore: sqlStore{
27✔
59
                        db:     db,
27✔
60
                        logger: logger,
27✔
61
                },
27✔
62
        }, nil
27✔
63
}
64

65
func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted(
66
        ctx context.Context,
67
        request *p.InternalRecordWorkflowExecutionStartedRequest,
68
) error {
426✔
69
        _, err := s.db.InsertIntoVisibility(ctx, &sqlplugin.VisibilityRow{
426✔
70
                DomainID:         request.DomainUUID,
426✔
71
                WorkflowID:       request.WorkflowID,
426✔
72
                RunID:            request.RunID,
426✔
73
                StartTime:        request.StartTimestamp,
426✔
74
                ExecutionTime:    request.ExecutionTimestamp,
426✔
75
                WorkflowTypeName: request.WorkflowTypeName,
426✔
76
                Memo:             request.Memo.Data,
426✔
77
                Encoding:         string(request.Memo.GetEncoding()),
426✔
78
                IsCron:           request.IsCron,
426✔
79
                NumClusters:      request.NumClusters,
426✔
80
                UpdateTime:       request.UpdateTimestamp,
426✔
81
                ShardID:          request.ShardID,
426✔
82
        })
426✔
83

426✔
84
        if err != nil {
426✔
85
                return convertCommonErrors(s.db, "RecordWorkflowExecutionStarted", "", err)
×
86
        }
×
87
        return nil
426✔
88
}
89

90
func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(
91
        ctx context.Context,
92
        request *p.InternalRecordWorkflowExecutionClosedRequest,
93
) error {
323✔
94
        closeTime := request.CloseTimestamp
323✔
95
        result, err := s.db.ReplaceIntoVisibility(ctx, &sqlplugin.VisibilityRow{
323✔
96
                DomainID:         request.DomainUUID,
323✔
97
                WorkflowID:       request.WorkflowID,
323✔
98
                RunID:            request.RunID,
323✔
99
                StartTime:        request.StartTimestamp,
323✔
100
                ExecutionTime:    request.ExecutionTimestamp,
323✔
101
                WorkflowTypeName: request.WorkflowTypeName,
323✔
102
                CloseTime:        &closeTime,
323✔
103
                CloseStatus:      common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))),
323✔
104
                HistoryLength:    &request.HistoryLength,
323✔
105
                Memo:             request.Memo.Data,
323✔
106
                Encoding:         string(request.Memo.GetEncoding()),
323✔
107
                IsCron:           request.IsCron,
323✔
108
                NumClusters:      request.NumClusters,
323✔
109
                UpdateTime:       request.UpdateTimestamp,
323✔
110
                ShardID:          request.ShardID,
323✔
111
        })
323✔
112
        if err != nil {
323✔
113
                return convertCommonErrors(s.db, "RecordWorkflowExecutionClosed", "", err)
×
114
        }
×
115
        noRowsAffected, err := result.RowsAffected()
323✔
116
        if err != nil {
323✔
117
                return &types.InternalServiceError{
×
118
                        Message: fmt.Sprintf("RecordWorkflowExecutionClosed rowsAffected error: %v", err),
×
119
                }
×
120
        }
×
121
        if noRowsAffected > 2 { // either adds a new row or deletes old row and adds new row
323✔
122
                return &types.InternalServiceError{
×
123
                        Message: fmt.Sprintf("RecordWorkflowExecutionClosed unexpected numRows (%v) updated", noRowsAffected),
×
124
                }
×
125
        }
×
126
        return nil
323✔
127
}
128

129
func (s *sqlVisibilityStore) RecordWorkflowExecutionUninitialized(
130
        ctx context.Context,
131
        request *p.InternalRecordWorkflowExecutionUninitializedRequest,
132
) error {
×
133
        // temporary: not implemented, only implemented for ES
×
134
        return nil
×
135
}
×
136

137
func (s *sqlVisibilityStore) UpsertWorkflowExecution(
138
        _ context.Context,
139
        request *p.InternalUpsertWorkflowExecutionRequest,
140
) error {
80✔
141
        if p.IsNopUpsertWorkflowRequest(request) {
80✔
142
                return nil
×
143
        }
×
144
        return p.ErrVisibilityOperationNotSupported
80✔
145
}
146

147
func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(
148
        ctx context.Context,
149
        request *p.InternalListWorkflowExecutionsRequest,
150
) (*p.InternalListWorkflowExecutionsResponse, error) {
4✔
151
        return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestTime, request.LatestTime,
4✔
152
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
8✔
153
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
4✔
154
                                DomainID:     request.DomainUUID,
4✔
155
                                MinStartTime: &request.EarliestTime,
4✔
156
                                MaxStartTime: &readLevel.Time,
4✔
157
                                RunID:        &readLevel.RunID,
4✔
158
                                PageSize:     &request.PageSize,
4✔
159
                        })
4✔
160
                })
4✔
161
}
162

163
func (s *sqlVisibilityStore) ListClosedWorkflowExecutions(
164
        ctx context.Context,
165
        request *p.InternalListWorkflowExecutionsRequest,
166
) (*p.InternalListWorkflowExecutionsResponse, error) {
8✔
167
        return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestTime, request.LatestTime,
8✔
168
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
16✔
169
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
8✔
170
                                DomainID:     request.DomainUUID,
8✔
171
                                MinStartTime: &request.EarliestTime,
8✔
172
                                MaxStartTime: &readLevel.Time,
8✔
173
                                Closed:       true,
8✔
174
                                RunID:        &readLevel.RunID,
8✔
175
                                PageSize:     &request.PageSize,
8✔
176
                        })
8✔
177
                })
8✔
178
}
179

180
func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType(
181
        ctx context.Context,
182
        request *p.InternalListWorkflowExecutionsByTypeRequest,
183
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
184
        return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestTime, request.LatestTime,
×
185
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
×
186
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
187
                                DomainID:         request.DomainUUID,
×
188
                                MinStartTime:     &request.EarliestTime,
×
189
                                MaxStartTime:     &readLevel.Time,
×
190
                                RunID:            &readLevel.RunID,
×
191
                                WorkflowTypeName: &request.WorkflowTypeName,
×
192
                                PageSize:         &request.PageSize,
×
193
                        })
×
194
                })
×
195
}
196

197
func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType(
198
        ctx context.Context,
199
        request *p.InternalListWorkflowExecutionsByTypeRequest,
200
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
201
        return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestTime, request.LatestTime,
×
202
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
×
203
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
204
                                DomainID:         request.DomainUUID,
×
205
                                MinStartTime:     &request.EarliestTime,
×
206
                                MaxStartTime:     &readLevel.Time,
×
207
                                Closed:           true,
×
208
                                RunID:            &readLevel.RunID,
×
209
                                WorkflowTypeName: &request.WorkflowTypeName,
×
210
                                PageSize:         &request.PageSize,
×
211
                        })
×
212
                })
×
213
}
214

215
func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
216
        ctx context.Context,
217
        request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
218
) (*p.InternalListWorkflowExecutionsResponse, error) {
60✔
219
        return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestTime, request.LatestTime,
60✔
220
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
120✔
221
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
60✔
222
                                DomainID:     request.DomainUUID,
60✔
223
                                MinStartTime: &request.EarliestTime,
60✔
224
                                MaxStartTime: &readLevel.Time,
60✔
225
                                RunID:        &readLevel.RunID,
60✔
226
                                WorkflowID:   &request.WorkflowID,
60✔
227
                                PageSize:     &request.PageSize,
60✔
228
                        })
60✔
229
                })
60✔
230
}
231

232
func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
233
        ctx context.Context,
234
        request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
235
) (*p.InternalListWorkflowExecutionsResponse, error) {
10✔
236
        return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestTime, request.LatestTime,
10✔
237
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
20✔
238
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
10✔
239
                                DomainID:     request.DomainUUID,
10✔
240
                                MinStartTime: &request.EarliestTime,
10✔
241
                                MaxStartTime: &readLevel.Time,
10✔
242
                                Closed:       true,
10✔
243
                                RunID:        &readLevel.RunID,
10✔
244
                                WorkflowID:   &request.WorkflowID,
10✔
245
                                PageSize:     &request.PageSize,
10✔
246
                        })
10✔
247
                })
10✔
248
}
249

250
func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByStatus(
251
        ctx context.Context,
252
        request *p.InternalListClosedWorkflowExecutionsByStatusRequest,
253
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
254
        return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestTime, request.LatestTime,
×
255
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
×
256
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
257
                                DomainID:     request.DomainUUID,
×
258
                                MinStartTime: &request.EarliestTime,
×
259
                                MaxStartTime: &readLevel.Time,
×
260
                                Closed:       true,
×
261
                                RunID:        &readLevel.RunID,
×
262
                                CloseStatus:  common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))),
×
263
                                PageSize:     &request.PageSize,
×
264
                        })
×
265
                })
×
266
}
267

268
func (s *sqlVisibilityStore) GetClosedWorkflowExecution(
269
        ctx context.Context,
270
        request *p.InternalGetClosedWorkflowExecutionRequest,
271
) (*p.InternalGetClosedWorkflowExecutionResponse, error) {
×
272
        execution := request.Execution
×
273
        rows, err := s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
274
                DomainID: request.DomainUUID,
×
275
                Closed:   true,
×
276
                RunID:    &execution.RunID,
×
277
        })
×
278
        if err != nil {
×
279
                if err == sql.ErrNoRows {
×
280
                        return nil, &types.EntityNotExistsError{
×
281
                                Message: fmt.Sprintf("Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
282
                                        execution.GetWorkflowID(), execution.GetRunID()),
×
283
                        }
×
284
                }
×
285
                return nil, convertCommonErrors(s.db, "GetClosedWorkflowExecution", "", err)
×
286
        }
287
        rows[0].DomainID = request.DomainUUID
×
288
        rows[0].RunID = execution.GetRunID()
×
289
        rows[0].WorkflowID = execution.GetWorkflowID()
×
290
        return &p.InternalGetClosedWorkflowExecutionResponse{Execution: s.rowToInfo(&rows[0])}, nil
×
291
}
292

293
func (s *sqlVisibilityStore) DeleteWorkflowExecution(
294
        ctx context.Context,
295
        request *p.VisibilityDeleteWorkflowExecutionRequest,
296
) error {
36✔
297
        _, err := s.db.DeleteFromVisibility(ctx, &sqlplugin.VisibilityFilter{
36✔
298
                DomainID: request.DomainID,
36✔
299
                RunID:    &request.RunID,
36✔
300
        })
36✔
301
        if err != nil {
36✔
302
                return convertCommonErrors(s.db, "DeleteWorkflowExecution", "", err)
×
303
        }
×
304
        return nil
36✔
305
}
306

307
func (s *sqlVisibilityStore) DeleteUninitializedWorkflowExecution(
308
        ctx context.Context,
309
        request *p.VisibilityDeleteWorkflowExecutionRequest,
310
) error {
×
311
        // temporary: not implemented, only implemented for ES
×
312
        return nil
×
313
}
×
314

315
func (s *sqlVisibilityStore) ListWorkflowExecutions(
316
        _ context.Context,
317
        _ *p.ListWorkflowExecutionsByQueryRequest,
318
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
319
        return nil, p.ErrVisibilityOperationNotSupported
×
320
}
×
321

322
func (s *sqlVisibilityStore) ListAllWorkflowExecutions(
323
        _ context.Context,
324
        _ *p.InternalListAllWorkflowExecutionsByTypeRequest,
NEW
325
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
NEW
326
        return nil, p.ErrVisibilityOperationNotSupported
×
NEW
327
}
×
328

329
func (s *sqlVisibilityStore) ScanWorkflowExecutions(
330
        _ context.Context,
331
        _ *p.ListWorkflowExecutionsByQueryRequest,
332
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
333
        return nil, p.ErrVisibilityOperationNotSupported
×
334
}
×
335

336
func (s *sqlVisibilityStore) CountWorkflowExecutions(
337
        _ context.Context,
338
        _ *p.CountWorkflowExecutionsRequest,
339
) (*p.CountWorkflowExecutionsResponse, error) {
×
340
        return nil, p.ErrVisibilityOperationNotSupported
×
341
}
×
342

343
func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.InternalVisibilityWorkflowExecutionInfo {
90✔
344
        if row.ExecutionTime.UnixNano() == 0 {
154✔
345
                row.ExecutionTime = row.StartTime
64✔
346
        }
64✔
347
        info := &p.InternalVisibilityWorkflowExecutionInfo{
90✔
348
                WorkflowID:    row.WorkflowID,
90✔
349
                RunID:         row.RunID,
90✔
350
                TypeName:      row.WorkflowTypeName,
90✔
351
                StartTime:     row.StartTime,
90✔
352
                ExecutionTime: row.ExecutionTime,
90✔
353
                IsCron:        row.IsCron,
90✔
354
                NumClusters:   row.NumClusters,
90✔
355
                Memo:          p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding)),
90✔
356
                UpdateTime:    row.UpdateTime,
90✔
357
                ShardID:       row.ShardID,
90✔
358
        }
90✔
359
        if row.CloseStatus != nil {
124✔
360
                status := workflow.WorkflowExecutionCloseStatus(*row.CloseStatus)
34✔
361
                info.Status = thrift.ToWorkflowExecutionCloseStatus(&status)
34✔
362
                info.CloseTime = *row.CloseTime
34✔
363
                info.HistoryLength = *row.HistoryLength
34✔
364
        }
34✔
365
        return info
90✔
366
}
367

368
func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []byte, earliestTime time.Time, latestTime time.Time, selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) {
82✔
369
        var readLevel *visibilityPageToken
82✔
370
        var err error
82✔
371
        if len(pageToken) > 0 {
82✔
372
                readLevel, err = s.deserializePageToken(pageToken)
×
373
                if err != nil {
×
374
                        return nil, err
×
375
                }
×
376
        } else {
82✔
377
                readLevel = &visibilityPageToken{Time: latestTime, RunID: ""}
82✔
378
        }
82✔
379
        rows, err := selectOp(readLevel)
82✔
380
        if err != nil {
82✔
381
                return nil, convertCommonErrors(s.db, opName, "", err)
×
382
        }
×
383
        if len(rows) == 0 {
90✔
384
                return &p.InternalListWorkflowExecutionsResponse{}, nil
8✔
385
        }
8✔
386

387
        var infos = make([]*p.InternalVisibilityWorkflowExecutionInfo, len(rows))
74✔
388
        for i, row := range rows {
164✔
389
                infos[i] = s.rowToInfo(&row)
90✔
390
        }
90✔
391
        var nextPageToken []byte
74✔
392
        lastRow := rows[len(rows)-1]
74✔
393
        lastStartTime := lastRow.StartTime
74✔
394
        if lastStartTime.Sub(earliestTime).Nanoseconds() > 0 {
148✔
395
                nextPageToken, err = s.serializePageToken(&visibilityPageToken{
74✔
396
                        Time:  lastStartTime,
74✔
397
                        RunID: lastRow.RunID,
74✔
398
                })
74✔
399
                if err != nil {
74✔
400
                        return nil, err
×
401
                }
×
402
        }
403
        return &p.InternalListWorkflowExecutionsResponse{
74✔
404
                Executions:    infos,
74✔
405
                NextPageToken: nextPageToken,
74✔
406
        }, nil
74✔
407
}
408

409
func (s *sqlVisibilityStore) deserializePageToken(data []byte) (*visibilityPageToken, error) {
×
410
        var token visibilityPageToken
×
411
        err := json.Unmarshal(data, &token)
×
412
        return &token, err
×
413
}
×
414

415
func (s *sqlVisibilityStore) serializePageToken(token *visibilityPageToken) ([]byte, error) {
74✔
416
        data, err := json.Marshal(token)
74✔
417
        return data, err
74✔
418
}
74✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc