• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186bdce-5e78-4bd9-9a33-eb4f2319ec8d

07 Mar 2023 09:06PM UTC coverage: 57.058% (-0.1%) from 57.174%
0186bdce-5e78-4bd9-9a33-eb4f2319ec8d

push

buildkite

GitHub
move sample logger into persistence metric client for cleaness (#5129)

12 of 12 new or added lines in 5 files covered. (100.0%)

85162 of 149256 relevant lines covered (57.06%)

2249.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.92
/common/persistence/sql/sqlplugin/postgres/visibility.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package postgres
22

23
import (
24
        "context"
25
        "database/sql"
26
        "errors"
27
        "fmt"
28
        "strings"
29

30
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
31
)
32

33
const (
34
        templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` +
35
                `domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, num_clusters, update_time, shard_id) ` +
36
                `VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
37
         ON CONFLICT (domain_id, run_id) DO NOTHING`
38

39
        templateCreateWorkflowExecutionClosed = `INSERT INTO executions_visibility (` +
40
                `domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron, num_clusters, update_time, shard_id) ` +
41
                `VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
42
                ON CONFLICT (domain_id, run_id) DO UPDATE
43
                  SET workflow_id = excluded.workflow_id,
44
                      start_time = excluded.start_time,
45
                      execution_time = excluded.execution_time,
46
              workflow_type_name = excluded.workflow_type_name,
47
                          close_time = excluded.close_time,
48
                          close_status = excluded.close_status,
49
                          history_length = excluded.history_length,
50
                          memo = excluded.memo,
51
                          encoding = excluded.encoding,
52
                                is_cron = excluded.is_cron,
53
                                num_clusters = excluded.num_clusters,
54
                                update_time = excluded.update_time,
55
                                shard_id = excluded.shard_id`
56

57
        // RunID condition is needed for correct pagination
58
        templateConditions1 = ` AND domain_id = $1
59
                 AND start_time >= $2
60
                 AND start_time <= $3
61
                  AND (run_id > $4 OR start_time < $5)
62
         ORDER BY start_time DESC, run_id
63
         LIMIT $6`
64

65
        templateConditions2 = ` AND domain_id = $2
66
                 AND start_time >= $3
67
                 AND start_time <= $4
68
                  AND (run_id > $5 OR start_time < $6)
69
         ORDER BY start_time DESC, run_id
70
         LIMIT $7`
71

72
        templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, update_time, shard_id`
73
        templateOpenSelect     = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE close_status IS NULL `
74

75
        templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, close_status, history_length
76
                 FROM executions_visibility WHERE close_status IS NOT NULL `
77

78
        templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions1
79

80
        templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditions1
81

82
        templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = $1` + templateConditions2
83

84
        templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = $1` + templateConditions2
85

86
        templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = $1` + templateConditions2
87

88
        templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = $1` + templateConditions2
89

90
        templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND close_status = $1` + templateConditions2
91

92
        templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron, update_time, shard_id
93
                 FROM executions_visibility
94
                 WHERE domain_id = $1 AND close_status IS NOT NULL
95
                 AND run_id = $2`
96

97
        templateDeleteWorkflowExecution = "DELETE FROM executions_visibility WHERE domain_id=$1 AND run_id=$2"
98
)
99

100
var errCloseParams = errors.New("missing one of {closeStatus, closeTime, historyLength} params")
101

102
// InsertIntoVisibility inserts a row into visibility table. If an row already exist,
103
// its left as such and no update will be made
104
func (pdb *db) InsertIntoVisibility(ctx context.Context, row *sqlplugin.VisibilityRow) (sql.Result, error) {
232✔
105
        dbShardID := sqlplugin.GetDBShardIDFromDomainID(row.DomainID, pdb.GetTotalNumDBShards())
232✔
106
        row.StartTime = pdb.converter.ToPostgresDateTime(row.StartTime)
232✔
107
        return pdb.driver.ExecContext(ctx, dbShardID, templateCreateWorkflowExecutionStarted,
232✔
108
                row.DomainID,
232✔
109
                row.WorkflowID,
232✔
110
                row.RunID,
232✔
111
                row.StartTime,
232✔
112
                row.ExecutionTime,
232✔
113
                row.WorkflowTypeName,
232✔
114
                row.Memo,
232✔
115
                row.Encoding,
232✔
116
                row.IsCron,
232✔
117
                row.NumClusters,
232✔
118
                row.UpdateTime,
232✔
119
                row.ShardID)
232✔
120
}
232✔
121

122
// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
123
func (pdb *db) ReplaceIntoVisibility(ctx context.Context, row *sqlplugin.VisibilityRow) (sql.Result, error) {
185✔
124
        dbShardID := sqlplugin.GetDBShardIDFromDomainID(row.DomainID, pdb.GetTotalNumDBShards())
185✔
125
        switch {
185✔
126
        case row.CloseStatus != nil && row.CloseTime != nil && row.HistoryLength != nil:
185✔
127
                row.StartTime = pdb.converter.ToPostgresDateTime(row.StartTime)
185✔
128
                closeTime := pdb.converter.ToPostgresDateTime(*row.CloseTime)
185✔
129
                return pdb.driver.ExecContext(ctx, dbShardID, templateCreateWorkflowExecutionClosed,
185✔
130
                        row.DomainID,
185✔
131
                        row.WorkflowID,
185✔
132
                        row.RunID,
185✔
133
                        row.StartTime,
185✔
134
                        row.ExecutionTime,
185✔
135
                        row.WorkflowTypeName,
185✔
136
                        closeTime,
185✔
137
                        *row.CloseStatus,
185✔
138
                        *row.HistoryLength,
185✔
139
                        row.Memo,
185✔
140
                        row.Encoding,
185✔
141
                        row.IsCron,
185✔
142
                        row.NumClusters,
185✔
143
                        row.UpdateTime,
185✔
144
                        row.ShardID)
185✔
145
        default:
×
146
                return nil, errCloseParams
×
147
        }
148
}
149

150
// DeleteFromVisibility deletes a row from visibility table if it exist
151
func (pdb *db) DeleteFromVisibility(ctx context.Context, filter *sqlplugin.VisibilityFilter) (sql.Result, error) {
23✔
152
        dbShardID := sqlplugin.GetDBShardIDFromDomainID(filter.DomainID, pdb.GetTotalNumDBShards())
23✔
153
        return pdb.driver.ExecContext(ctx, dbShardID, templateDeleteWorkflowExecution, filter.DomainID, filter.RunID)
23✔
154
}
23✔
155

156
// SelectFromVisibility reads one or more rows from visibility table
157
func (pdb *db) SelectFromVisibility(ctx context.Context, filter *sqlplugin.VisibilityFilter) ([]sqlplugin.VisibilityRow, error) {
67✔
158
        dbShardID := sqlplugin.GetDBShardIDFromDomainID(filter.DomainID, pdb.GetTotalNumDBShards())
67✔
159
        var err error
67✔
160
        var rows []sqlplugin.VisibilityRow
67✔
161
        if filter.MinStartTime != nil {
129✔
162
                *filter.MinStartTime = pdb.converter.ToPostgresDateTime(*filter.MinStartTime)
62✔
163
        }
62✔
164
        if filter.MaxStartTime != nil {
129✔
165
                *filter.MaxStartTime = pdb.converter.ToPostgresDateTime(*filter.MaxStartTime)
62✔
166
        }
62✔
167
        switch {
67✔
168
        case filter.MinStartTime == nil && filter.RunID != nil && filter.Closed:
5✔
169
                var row sqlplugin.VisibilityRow
5✔
170
                err = pdb.driver.GetContext(ctx, dbShardID, &row, templateGetClosedWorkflowExecution, filter.DomainID, *filter.RunID)
5✔
171
                if err == nil {
8✔
172
                        rows = append(rows, row)
3✔
173
                }
3✔
174
        case filter.MinStartTime != nil && filter.WorkflowID != nil:
37✔
175
                qry := templateGetOpenWorkflowExecutionsByID
37✔
176
                if filter.Closed {
43✔
177
                        qry = templateGetClosedWorkflowExecutionsByID
6✔
178
                }
6✔
179
                err = pdb.driver.SelectContext(ctx, dbShardID, &rows,
37✔
180
                        qry,
37✔
181
                        *filter.WorkflowID,
37✔
182
                        filter.DomainID,
37✔
183
                        pdb.converter.ToPostgresDateTime(*filter.MinStartTime),
37✔
184
                        pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
37✔
185
                        *filter.RunID,
37✔
186
                        *filter.MinStartTime,
37✔
187
                        *filter.PageSize)
37✔
188
        case filter.MinStartTime != nil && filter.WorkflowTypeName != nil:
2✔
189
                qry := templateGetOpenWorkflowExecutionsByType
2✔
190
                if filter.Closed {
3✔
191
                        qry = templateGetClosedWorkflowExecutionsByType
1✔
192
                }
1✔
193
                err = pdb.driver.SelectContext(ctx, dbShardID, &rows,
2✔
194
                        qry,
2✔
195
                        *filter.WorkflowTypeName,
2✔
196
                        filter.DomainID,
2✔
197
                        pdb.converter.ToPostgresDateTime(*filter.MinStartTime),
2✔
198
                        pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
2✔
199
                        *filter.RunID,
2✔
200
                        *filter.MaxStartTime,
2✔
201
                        *filter.PageSize)
2✔
202
        case filter.MinStartTime != nil && filter.CloseStatus != nil:
1✔
203
                err = pdb.driver.SelectContext(ctx, dbShardID, &rows,
1✔
204
                        templateGetClosedWorkflowExecutionsByStatus,
1✔
205
                        *filter.CloseStatus,
1✔
206
                        filter.DomainID,
1✔
207
                        pdb.converter.ToPostgresDateTime(*filter.MinStartTime),
1✔
208
                        pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
1✔
209
                        *filter.RunID,
1✔
210
                        pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
1✔
211
                        *filter.PageSize)
1✔
212
        case filter.MinStartTime != nil:
22✔
213
                qry := templateGetOpenWorkflowExecutions
22✔
214
                if filter.Closed {
34✔
215
                        qry = templateGetClosedWorkflowExecutions
12✔
216
                }
12✔
217
                minSt := pdb.converter.ToPostgresDateTime(*filter.MinStartTime)
22✔
218
                maxSt := pdb.converter.ToPostgresDateTime(*filter.MaxStartTime)
22✔
219
                err = pdb.driver.SelectContext(ctx, dbShardID, &rows,
22✔
220
                        qry,
22✔
221
                        filter.DomainID,
22✔
222
                        minSt,
22✔
223
                        maxSt,
22✔
224
                        *filter.RunID,
22✔
225
                        maxSt,
22✔
226
                        *filter.PageSize)
22✔
227
        default:
×
228
                return nil, fmt.Errorf("invalid query filter")
×
229
        }
230
        if err != nil {
69✔
231
                return nil, err
2✔
232
        }
2✔
233
        for i := range rows {
138✔
234
                rows[i].StartTime = pdb.converter.FromPostgresDateTime(rows[i].StartTime)
73✔
235
                rows[i].ExecutionTime = pdb.converter.FromPostgresDateTime(rows[i].ExecutionTime)
73✔
236
                if rows[i].CloseTime != nil {
111✔
237
                        closeTime := pdb.converter.FromPostgresDateTime(*rows[i].CloseTime)
38✔
238
                        rows[i].CloseTime = &closeTime
38✔
239
                }
38✔
240
                rows[i].RunID = strings.TrimSpace(rows[i].RunID)
73✔
241
                rows[i].WorkflowID = strings.TrimSpace(rows[i].WorkflowID)
73✔
242
        }
243
        return rows, err
65✔
244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc