• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186bdce-5e78-4bd9-9a33-eb4f2319ec8d

07 Mar 2023 09:06PM UTC coverage: 57.058% (-0.1%) from 57.174%
0186bdce-5e78-4bd9-9a33-eb4f2319ec8d

push

buildkite

GitHub
move sample logger into persistence metric client for cleaness (#5129)

12 of 12 new or added lines in 5 files covered. (100.0%)

85162 of 149256 relevant lines covered (57.06%)

2249.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.11
/common/persistence/sql/sqlVisibilityStore.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "encoding/json"
27
        "fmt"
28
        "time"
29

30
        workflow "github.com/uber/cadence/.gen/go/shared"
31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/config"
33
        "github.com/uber/cadence/common/log"
34
        p "github.com/uber/cadence/common/persistence"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
        "github.com/uber/cadence/common/types/mapper/thrift"
38
)
39

40
type (
41
        sqlVisibilityStore struct {
42
                sqlStore
43
        }
44

45
        visibilityPageToken struct {
46
                Time  time.Time
47
                RunID string
48
        }
49
)
50

51
// NewSQLVisibilityStore creates an instance of ExecutionStore
52
func NewSQLVisibilityStore(cfg config.SQL, logger log.Logger) (p.VisibilityStore, error) {
18✔
53
        db, err := NewSQLDB(&cfg)
18✔
54
        if err != nil {
18✔
55
                return nil, err
×
56
        }
×
57
        return &sqlVisibilityStore{
18✔
58
                sqlStore: sqlStore{
18✔
59
                        db:     db,
18✔
60
                        logger: logger,
18✔
61
                },
18✔
62
        }, nil
18✔
63
}
64

65
func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted(
66
        ctx context.Context,
67
        request *p.InternalRecordWorkflowExecutionStartedRequest,
68
) error {
423✔
69
        _, err := s.db.InsertIntoVisibility(ctx, &sqlplugin.VisibilityRow{
423✔
70
                DomainID:         request.DomainUUID,
423✔
71
                WorkflowID:       request.WorkflowID,
423✔
72
                RunID:            request.RunID,
423✔
73
                StartTime:        request.StartTimestamp,
423✔
74
                ExecutionTime:    request.ExecutionTimestamp,
423✔
75
                WorkflowTypeName: request.WorkflowTypeName,
423✔
76
                Memo:             request.Memo.Data,
423✔
77
                Encoding:         string(request.Memo.GetEncoding()),
423✔
78
                IsCron:           request.IsCron,
423✔
79
                NumClusters:      request.NumClusters,
423✔
80
                UpdateTime:       request.UpdateTimestamp,
423✔
81
                ShardID:          request.ShardID,
423✔
82
        })
423✔
83

423✔
84
        if err != nil {
423✔
85
                return convertCommonErrors(s.db, "RecordWorkflowExecutionStarted", "", err)
×
86
        }
×
87
        return nil
423✔
88
}
89

90
func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(
91
        ctx context.Context,
92
        request *p.InternalRecordWorkflowExecutionClosedRequest,
93
) error {
325✔
94
        closeTime := request.CloseTimestamp
325✔
95
        result, err := s.db.ReplaceIntoVisibility(ctx, &sqlplugin.VisibilityRow{
325✔
96
                DomainID:         request.DomainUUID,
325✔
97
                WorkflowID:       request.WorkflowID,
325✔
98
                RunID:            request.RunID,
325✔
99
                StartTime:        request.StartTimestamp,
325✔
100
                ExecutionTime:    request.ExecutionTimestamp,
325✔
101
                WorkflowTypeName: request.WorkflowTypeName,
325✔
102
                CloseTime:        &closeTime,
325✔
103
                CloseStatus:      common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))),
325✔
104
                HistoryLength:    &request.HistoryLength,
325✔
105
                Memo:             request.Memo.Data,
325✔
106
                Encoding:         string(request.Memo.GetEncoding()),
325✔
107
                IsCron:           request.IsCron,
325✔
108
                NumClusters:      request.NumClusters,
325✔
109
                UpdateTime:       request.UpdateTimestamp,
325✔
110
                ShardID:          request.ShardID,
325✔
111
        })
325✔
112
        if err != nil {
325✔
113
                return convertCommonErrors(s.db, "RecordWorkflowExecutionClosed", "", err)
×
114
        }
×
115
        noRowsAffected, err := result.RowsAffected()
325✔
116
        if err != nil {
325✔
117
                return &types.InternalServiceError{
×
118
                        Message: fmt.Sprintf("RecordWorkflowExecutionClosed rowsAffected error: %v", err),
×
119
                }
×
120
        }
×
121
        if noRowsAffected > 2 { // either adds a new row or deletes old row and adds new row
325✔
122
                return &types.InternalServiceError{
×
123
                        Message: fmt.Sprintf("RecordWorkflowExecutionClosed unexpected numRows (%v) updated", noRowsAffected),
×
124
                }
×
125
        }
×
126
        return nil
325✔
127
}
128

129
func (s *sqlVisibilityStore) RecordWorkflowExecutionUninitialized(
130
        ctx context.Context,
131
        request *p.InternalRecordWorkflowExecutionUninitializedRequest,
132
) error {
×
133
        // temporary: not implemented, only implemented for ES
×
134
        return nil
×
135
}
×
136

137
func (s *sqlVisibilityStore) UpsertWorkflowExecution(
138
        _ context.Context,
139
        request *p.InternalUpsertWorkflowExecutionRequest,
140
) error {
80✔
141
        if p.IsNopUpsertWorkflowRequest(request) {
80✔
142
                return nil
×
143
        }
×
144
        return p.ErrVisibilityOperationNotSupported
80✔
145
}
146

147
func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(
148
        ctx context.Context,
149
        request *p.InternalListWorkflowExecutionsRequest,
150
) (*p.InternalListWorkflowExecutionsResponse, error) {
4✔
151
        return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestTime, request.LatestTime,
4✔
152
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
8✔
153
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
4✔
154
                                DomainID:     request.DomainUUID,
4✔
155
                                MinStartTime: &request.EarliestTime,
4✔
156
                                MaxStartTime: &readLevel.Time,
4✔
157
                                RunID:        &readLevel.RunID,
4✔
158
                                PageSize:     &request.PageSize,
4✔
159
                        })
4✔
160
                })
4✔
161
}
162

163
func (s *sqlVisibilityStore) ListClosedWorkflowExecutions(
164
        ctx context.Context,
165
        request *p.InternalListWorkflowExecutionsRequest,
166
) (*p.InternalListWorkflowExecutionsResponse, error) {
6✔
167
        return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestTime, request.LatestTime,
6✔
168
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
12✔
169
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
6✔
170
                                DomainID:     request.DomainUUID,
6✔
171
                                MinStartTime: &request.EarliestTime,
6✔
172
                                MaxStartTime: &readLevel.Time,
6✔
173
                                Closed:       true,
6✔
174
                                RunID:        &readLevel.RunID,
6✔
175
                                PageSize:     &request.PageSize,
6✔
176
                        })
6✔
177
                })
6✔
178
}
179

180
func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType(
181
        ctx context.Context,
182
        request *p.InternalListWorkflowExecutionsByTypeRequest,
183
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
184
        return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestTime, request.LatestTime,
×
185
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
×
186
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
187
                                DomainID:         request.DomainUUID,
×
188
                                MinStartTime:     &request.EarliestTime,
×
189
                                MaxStartTime:     &readLevel.Time,
×
190
                                RunID:            &readLevel.RunID,
×
191
                                WorkflowTypeName: &request.WorkflowTypeName,
×
192
                                PageSize:         &request.PageSize,
×
193
                        })
×
194
                })
×
195
}
196

197
func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType(
198
        ctx context.Context,
199
        request *p.InternalListWorkflowExecutionsByTypeRequest,
200
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
201
        return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestTime, request.LatestTime,
×
202
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
×
203
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
204
                                DomainID:         request.DomainUUID,
×
205
                                MinStartTime:     &request.EarliestTime,
×
206
                                MaxStartTime:     &readLevel.Time,
×
207
                                Closed:           true,
×
208
                                RunID:            &readLevel.RunID,
×
209
                                WorkflowTypeName: &request.WorkflowTypeName,
×
210
                                PageSize:         &request.PageSize,
×
211
                        })
×
212
                })
×
213
}
214

215
func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
216
        ctx context.Context,
217
        request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
218
) (*p.InternalListWorkflowExecutionsResponse, error) {
59✔
219
        return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestTime, request.LatestTime,
59✔
220
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
118✔
221
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
59✔
222
                                DomainID:     request.DomainUUID,
59✔
223
                                MinStartTime: &request.EarliestTime,
59✔
224
                                MaxStartTime: &readLevel.Time,
59✔
225
                                RunID:        &readLevel.RunID,
59✔
226
                                WorkflowID:   &request.WorkflowID,
59✔
227
                                PageSize:     &request.PageSize,
59✔
228
                        })
59✔
229
                })
59✔
230
}
231

232
func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
233
        ctx context.Context,
234
        request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
235
) (*p.InternalListWorkflowExecutionsResponse, error) {
10✔
236
        return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestTime, request.LatestTime,
10✔
237
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
20✔
238
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
10✔
239
                                DomainID:     request.DomainUUID,
10✔
240
                                MinStartTime: &request.EarliestTime,
10✔
241
                                MaxStartTime: &readLevel.Time,
10✔
242
                                Closed:       true,
10✔
243
                                RunID:        &readLevel.RunID,
10✔
244
                                WorkflowID:   &request.WorkflowID,
10✔
245
                                PageSize:     &request.PageSize,
10✔
246
                        })
10✔
247
                })
10✔
248
}
249

250
func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByStatus(
251
        ctx context.Context,
252
        request *p.InternalListClosedWorkflowExecutionsByStatusRequest,
253
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
254
        return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestTime, request.LatestTime,
×
255
                func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
×
256
                        return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
257
                                DomainID:     request.DomainUUID,
×
258
                                MinStartTime: &request.EarliestTime,
×
259
                                MaxStartTime: &readLevel.Time,
×
260
                                Closed:       true,
×
261
                                RunID:        &readLevel.RunID,
×
262
                                CloseStatus:  common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))),
×
263
                                PageSize:     &request.PageSize,
×
264
                        })
×
265
                })
×
266
}
267

268
func (s *sqlVisibilityStore) GetClosedWorkflowExecution(
269
        ctx context.Context,
270
        request *p.InternalGetClosedWorkflowExecutionRequest,
271
) (*p.InternalGetClosedWorkflowExecutionResponse, error) {
×
272
        execution := request.Execution
×
273
        rows, err := s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{
×
274
                DomainID: request.DomainUUID,
×
275
                Closed:   true,
×
276
                RunID:    &execution.RunID,
×
277
        })
×
278
        if err != nil {
×
279
                if err == sql.ErrNoRows {
×
280
                        return nil, &types.EntityNotExistsError{
×
281
                                Message: fmt.Sprintf("Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
282
                                        execution.GetWorkflowID(), execution.GetRunID()),
×
283
                        }
×
284
                }
×
285
                return nil, convertCommonErrors(s.db, "GetClosedWorkflowExecution", "", err)
×
286
        }
287
        rows[0].DomainID = request.DomainUUID
×
288
        rows[0].RunID = execution.GetRunID()
×
289
        rows[0].WorkflowID = execution.GetWorkflowID()
×
290
        return &p.InternalGetClosedWorkflowExecutionResponse{Execution: s.rowToInfo(&rows[0])}, nil
×
291
}
292

293
func (s *sqlVisibilityStore) DeleteWorkflowExecution(
294
        ctx context.Context,
295
        request *p.VisibilityDeleteWorkflowExecutionRequest,
296
) error {
36✔
297
        _, err := s.db.DeleteFromVisibility(ctx, &sqlplugin.VisibilityFilter{
36✔
298
                DomainID: request.DomainID,
36✔
299
                RunID:    &request.RunID,
36✔
300
        })
36✔
301
        if err != nil {
36✔
302
                return convertCommonErrors(s.db, "DeleteWorkflowExecution", "", err)
×
303
        }
×
304
        return nil
36✔
305
}
306

307
func (s *sqlVisibilityStore) DeleteUninitializedWorkflowExecution(
308
        ctx context.Context,
309
        request *p.VisibilityDeleteWorkflowExecutionRequest,
310
) error {
×
311
        // temporary: not implemented, only implemented for ES
×
312
        return nil
×
313
}
×
314

315
func (s *sqlVisibilityStore) ListWorkflowExecutions(
316
        _ context.Context,
317
        _ *p.ListWorkflowExecutionsByQueryRequest,
318
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
319
        return nil, p.ErrVisibilityOperationNotSupported
×
320
}
×
321

322
func (s *sqlVisibilityStore) ScanWorkflowExecutions(
323
        _ context.Context,
324
        _ *p.ListWorkflowExecutionsByQueryRequest,
325
) (*p.InternalListWorkflowExecutionsResponse, error) {
×
326
        return nil, p.ErrVisibilityOperationNotSupported
×
327
}
×
328

329
func (s *sqlVisibilityStore) CountWorkflowExecutions(
330
        _ context.Context,
331
        _ *p.CountWorkflowExecutionsRequest,
332
) (*p.CountWorkflowExecutionsResponse, error) {
×
333
        return nil, p.ErrVisibilityOperationNotSupported
×
334
}
×
335

336
func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.InternalVisibilityWorkflowExecutionInfo {
83✔
337
        if row.ExecutionTime.UnixNano() == 0 {
146✔
338
                row.ExecutionTime = row.StartTime
63✔
339
        }
63✔
340
        info := &p.InternalVisibilityWorkflowExecutionInfo{
83✔
341
                WorkflowID:    row.WorkflowID,
83✔
342
                RunID:         row.RunID,
83✔
343
                TypeName:      row.WorkflowTypeName,
83✔
344
                StartTime:     row.StartTime,
83✔
345
                ExecutionTime: row.ExecutionTime,
83✔
346
                IsCron:        row.IsCron,
83✔
347
                NumClusters:   row.NumClusters,
83✔
348
                Memo:          p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding)),
83✔
349
                UpdateTime:    row.UpdateTime,
83✔
350
                ShardID:       row.ShardID,
83✔
351
        }
83✔
352
        if row.CloseStatus != nil {
111✔
353
                status := workflow.WorkflowExecutionCloseStatus(*row.CloseStatus)
28✔
354
                info.Status = thrift.ToWorkflowExecutionCloseStatus(&status)
28✔
355
                info.CloseTime = *row.CloseTime
28✔
356
                info.HistoryLength = *row.HistoryLength
28✔
357
        }
28✔
358
        return info
83✔
359
}
360

361
func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []byte, earliestTime time.Time, latestTime time.Time, selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) {
79✔
362
        var readLevel *visibilityPageToken
79✔
363
        var err error
79✔
364
        if len(pageToken) > 0 {
79✔
365
                readLevel, err = s.deserializePageToken(pageToken)
×
366
                if err != nil {
×
367
                        return nil, err
×
368
                }
×
369
        } else {
79✔
370
                readLevel = &visibilityPageToken{Time: latestTime, RunID: ""}
79✔
371
        }
79✔
372
        rows, err := selectOp(readLevel)
79✔
373
        if err != nil {
79✔
374
                return nil, convertCommonErrors(s.db, opName, "", err)
×
375
        }
×
376
        if len(rows) == 0 {
87✔
377
                return &p.InternalListWorkflowExecutionsResponse{}, nil
8✔
378
        }
8✔
379

380
        var infos = make([]*p.InternalVisibilityWorkflowExecutionInfo, len(rows))
71✔
381
        for i, row := range rows {
154✔
382
                infos[i] = s.rowToInfo(&row)
83✔
383
        }
83✔
384
        var nextPageToken []byte
71✔
385
        lastRow := rows[len(rows)-1]
71✔
386
        lastStartTime := lastRow.StartTime
71✔
387
        if lastStartTime.Sub(earliestTime).Nanoseconds() > 0 {
142✔
388
                nextPageToken, err = s.serializePageToken(&visibilityPageToken{
71✔
389
                        Time:  lastStartTime,
71✔
390
                        RunID: lastRow.RunID,
71✔
391
                })
71✔
392
                if err != nil {
71✔
393
                        return nil, err
×
394
                }
×
395
        }
396
        return &p.InternalListWorkflowExecutionsResponse{
71✔
397
                Executions:    infos,
71✔
398
                NextPageToken: nextPageToken,
71✔
399
        }, nil
71✔
400
}
401

402
func (s *sqlVisibilityStore) deserializePageToken(data []byte) (*visibilityPageToken, error) {
×
403
        var token visibilityPageToken
×
404
        err := json.Unmarshal(data, &token)
×
405
        return &token, err
×
406
}
×
407

408
func (s *sqlVisibilityStore) serializePageToken(token *visibilityPageToken) ([]byte, error) {
71✔
409
        data, err := json.Marshal(token)
71✔
410
        return data, err
71✔
411
}
71✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc