• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e09-c86e-4746-913e-24de314b8c37

07 Apr 2023 11:44PM UTC coverage: 57.199% (+0.1%) from 57.104%
01875e09-c86e-4746-913e-24de314b8c37

push

buildkite

GitHub
enables CRUD operations for global isolation-groups (#5191)

145 of 145 new or added lines in 3 files covered. (100.0%)

85859 of 150106 relevant lines covered (57.2%)

2297.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.05
/common/persistence/sql/sqlDomainStore.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/cluster"
30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/persistence/serialization"
33
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
type sqlDomainStore struct {
38
        sqlStore
39
        activeClusterName string
40
}
41

42
// newMetadataPersistenceV2 creates an instance of sqlDomainStore
43
func newMetadataPersistenceV2(
44
        db sqlplugin.DB,
45
        currentClusterName string,
46
        logger log.Logger,
47
        parser serialization.Parser,
48
) (persistence.DomainStore, error) {
34✔
49
        return &sqlDomainStore{
34✔
50
                sqlStore: sqlStore{
34✔
51
                        db:     db,
34✔
52
                        logger: logger,
34✔
53
                        parser: parser,
34✔
54
                },
34✔
55
                activeClusterName: currentClusterName,
34✔
56
        }, nil
34✔
57
}
34✔
58

59
func updateMetadata(ctx context.Context, tx sqlplugin.Tx, oldNotificationVersion int64) error {
42✔
60
        result, err := tx.UpdateDomainMetadata(ctx, &sqlplugin.DomainMetadataRow{NotificationVersion: oldNotificationVersion})
42✔
61
        if err != nil {
42✔
62
                return convertCommonErrors(tx, "updateDomainMetadata", "", err)
×
63
        }
×
64

65
        rowsAffected, err := result.RowsAffected()
42✔
66
        if err != nil {
42✔
67
                return &types.InternalServiceError{
×
68
                        Message: fmt.Sprintf("Could not verify whether domain metadata update occurred. Error: %v", err),
×
69
                }
×
70
        } else if rowsAffected != 1 {
42✔
71
                return &types.InternalServiceError{
×
72
                        Message: "Failed to update domain metadata. <>1 rows affected.",
×
73
                }
×
74
        }
×
75

76
        return nil
42✔
77
}
78

79
func lockMetadata(ctx context.Context, tx sqlplugin.Tx) error {
42✔
80
        err := tx.LockDomainMetadata(ctx)
42✔
81
        if err != nil {
42✔
82
                return convertCommonErrors(tx, "lockDomainMetadata", "", err)
×
83
        }
×
84
        return nil
42✔
85
}
86

87
func (m *sqlDomainStore) CreateDomain(
88
        ctx context.Context,
89
        request *persistence.InternalCreateDomainRequest,
90
) (*persistence.CreateDomainResponse, error) {
42✔
91
        metadata, err := m.GetMetadata(ctx)
42✔
92
        if err != nil {
42✔
93
                return nil, err
×
94
        }
×
95

96
        clusters := make([]string, len(request.ReplicationConfig.Clusters))
42✔
97
        for i := range clusters {
76✔
98
                clusters[i] = request.ReplicationConfig.Clusters[i].ClusterName
34✔
99
        }
34✔
100

101
        var badBinaries []byte
42✔
102
        badBinariesEncoding := string(common.EncodingTypeEmpty)
42✔
103
        if request.Config.BadBinaries != nil {
84✔
104
                badBinaries = request.Config.BadBinaries.Data
42✔
105
                badBinariesEncoding = string(request.Config.BadBinaries.GetEncoding())
42✔
106
        }
42✔
107

108
        domainInfo := &serialization.DomainInfo{
42✔
109
                Name:                        request.Info.Name,
42✔
110
                Status:                      int32(request.Info.Status),
42✔
111
                Description:                 request.Info.Description,
42✔
112
                Owner:                       request.Info.OwnerEmail,
42✔
113
                Data:                        request.Info.Data,
42✔
114
                Retention:                   request.Config.Retention,
42✔
115
                EmitMetric:                  request.Config.EmitMetric,
42✔
116
                ArchivalBucket:              request.Config.ArchivalBucket,
42✔
117
                ArchivalStatus:              int16(request.Config.ArchivalStatus),
42✔
118
                HistoryArchivalStatus:       int16(request.Config.HistoryArchivalStatus),
42✔
119
                HistoryArchivalURI:          request.Config.HistoryArchivalURI,
42✔
120
                VisibilityArchivalStatus:    int16(request.Config.VisibilityArchivalStatus),
42✔
121
                VisibilityArchivalURI:       request.Config.VisibilityArchivalURI,
42✔
122
                ActiveClusterName:           request.ReplicationConfig.ActiveClusterName,
42✔
123
                Clusters:                    clusters,
42✔
124
                ConfigVersion:               request.ConfigVersion,
42✔
125
                FailoverVersion:             request.FailoverVersion,
42✔
126
                NotificationVersion:         metadata.NotificationVersion,
42✔
127
                FailoverNotificationVersion: persistence.InitialFailoverNotificationVersion,
42✔
128
                PreviousFailoverVersion:     common.InitialPreviousFailoverVersion,
42✔
129
                LastUpdatedTimestamp:        request.LastUpdatedTime,
42✔
130
                BadBinaries:                 badBinaries,
42✔
131
                BadBinariesEncoding:         badBinariesEncoding,
42✔
132
        }
42✔
133

42✔
134
        blob, err := m.parser.DomainInfoToBlob(domainInfo)
42✔
135
        if err != nil {
42✔
136
                return nil, err
×
137
        }
×
138

139
        var resp *persistence.CreateDomainResponse
42✔
140
        err = m.txExecute(ctx, sqlplugin.DbDefaultShard, "CreateDomain", func(tx sqlplugin.Tx) error {
84✔
141
                if _, err1 := tx.InsertIntoDomain(ctx, &sqlplugin.DomainRow{
42✔
142
                        Name:         request.Info.Name,
42✔
143
                        ID:           serialization.MustParseUUID(request.Info.ID),
42✔
144
                        Data:         blob.Data,
42✔
145
                        DataEncoding: string(blob.Encoding),
42✔
146
                        IsGlobal:     request.IsGlobalDomain,
42✔
147
                }); err1 != nil {
42✔
148
                        if m.db.IsDupEntryError(err1) {
×
149
                                return &types.DomainAlreadyExistsError{
×
150
                                        Message: fmt.Sprintf("name: %v", request.Info.Name),
×
151
                                }
×
152
                        }
×
153
                        return err1
×
154
                }
155
                if err1 := lockMetadata(ctx, tx); err1 != nil {
42✔
156
                        return err1
×
157
                }
×
158
                if err1 := updateMetadata(ctx, tx, metadata.NotificationVersion); err1 != nil {
42✔
159
                        return err1
×
160
                }
×
161
                resp = &persistence.CreateDomainResponse{ID: request.Info.ID}
42✔
162
                return nil
42✔
163
        })
164
        return resp, err
42✔
165
}
166

167
func (m *sqlDomainStore) GetDomain(
168
        ctx context.Context,
169
        request *persistence.GetDomainRequest,
170
) (*persistence.InternalGetDomainResponse, error) {
524✔
171
        filter := &sqlplugin.DomainFilter{}
524✔
172
        switch {
524✔
173
        case request.Name != "" && request.ID != "":
×
174
                return nil, &types.BadRequestError{
×
175
                        Message: "GetDomain operation failed.  Both ID and Name specified in request.",
×
176
                }
×
177
        case request.Name != "":
524✔
178
                filter.Name = &request.Name
524✔
179
        case request.ID != "":
×
180
                filter.ID = serialization.UUIDPtr(serialization.MustParseUUID(request.ID))
×
181
        default:
×
182
                return nil, &types.BadRequestError{
×
183
                        Message: "GetDomain operation failed.  Both ID and Name are empty.",
×
184
                }
×
185
        }
186

187
        rows, err := m.db.SelectFromDomain(ctx, filter)
524✔
188
        if err != nil {
962✔
189
                switch err {
438✔
190
                case sql.ErrNoRows:
438✔
191
                        // We did not return in the above for-loop because there were no rows.
438✔
192
                        identity := request.Name
438✔
193
                        if len(request.ID) > 0 {
438✔
194
                                identity = request.ID
×
195
                        }
×
196

197
                        return nil, &types.EntityNotExistsError{
438✔
198
                                Message: fmt.Sprintf("Domain %s does not exist.", identity),
438✔
199
                        }
438✔
200
                default:
×
201
                        return nil, convertCommonErrors(m.db, "GetDomain", "", err)
×
202
                }
203
        }
204

205
        response, err := m.domainRowToGetDomainResponse(&rows[0])
88✔
206
        if err != nil {
88✔
207
                return nil, err
×
208
        }
×
209

210
        return response, nil
88✔
211
}
212

213
func (m *sqlDomainStore) domainRowToGetDomainResponse(row *sqlplugin.DomainRow) (*persistence.InternalGetDomainResponse, error) {
3,096✔
214
        domainInfo, err := m.parser.DomainInfoFromBlob(row.Data, row.DataEncoding)
3,096✔
215
        if err != nil {
3,096✔
216
                return nil, err
×
217
        }
×
218

219
        clusters := make([]*persistence.ClusterReplicationConfig, len(domainInfo.Clusters))
3,096✔
220
        for i := range domainInfo.Clusters {
5,562✔
221
                clusters[i] = &persistence.ClusterReplicationConfig{ClusterName: domainInfo.Clusters[i]}
2,466✔
222
        }
2,466✔
223

224
        var badBinaries *persistence.DataBlob
3,096✔
225
        if domainInfo.BadBinaries != nil {
6,192✔
226
                badBinaries = persistence.NewDataBlob(domainInfo.BadBinaries, common.EncodingType(domainInfo.GetBadBinariesEncoding()))
3,096✔
227
        }
3,096✔
228

229
        return &persistence.InternalGetDomainResponse{
3,096✔
230
                Info: &persistence.DomainInfo{
3,096✔
231
                        ID:          row.ID.String(),
3,096✔
232
                        Name:        row.Name,
3,096✔
233
                        Status:      int(domainInfo.GetStatus()),
3,096✔
234
                        Description: domainInfo.GetDescription(),
3,096✔
235
                        OwnerEmail:  domainInfo.GetOwner(),
3,096✔
236
                        Data:        domainInfo.GetData(),
3,096✔
237
                },
3,096✔
238
                Config: &persistence.InternalDomainConfig{
3,096✔
239
                        Retention:                domainInfo.GetRetention(),
3,096✔
240
                        EmitMetric:               domainInfo.GetEmitMetric(),
3,096✔
241
                        ArchivalBucket:           domainInfo.GetArchivalBucket(),
3,096✔
242
                        ArchivalStatus:           types.ArchivalStatus(domainInfo.GetArchivalStatus()),
3,096✔
243
                        HistoryArchivalStatus:    types.ArchivalStatus(domainInfo.GetHistoryArchivalStatus()),
3,096✔
244
                        HistoryArchivalURI:       domainInfo.GetHistoryArchivalURI(),
3,096✔
245
                        VisibilityArchivalStatus: types.ArchivalStatus(domainInfo.GetVisibilityArchivalStatus()),
3,096✔
246
                        VisibilityArchivalURI:    domainInfo.GetVisibilityArchivalURI(),
3,096✔
247
                        BadBinaries:              badBinaries,
3,096✔
248
                },
3,096✔
249
                ReplicationConfig: &persistence.DomainReplicationConfig{
3,096✔
250
                        ActiveClusterName: cluster.GetOrUseDefaultActiveCluster(m.activeClusterName, domainInfo.GetActiveClusterName()),
3,096✔
251
                        Clusters:          cluster.GetOrUseDefaultClusters(m.activeClusterName, clusters),
3,096✔
252
                },
3,096✔
253
                IsGlobalDomain:              row.IsGlobal,
3,096✔
254
                FailoverVersion:             domainInfo.GetFailoverVersion(),
3,096✔
255
                ConfigVersion:               domainInfo.GetConfigVersion(),
3,096✔
256
                NotificationVersion:         domainInfo.GetNotificationVersion(),
3,096✔
257
                FailoverNotificationVersion: domainInfo.GetFailoverNotificationVersion(),
3,096✔
258
                PreviousFailoverVersion:     domainInfo.GetPreviousFailoverVersion(),
3,096✔
259
                FailoverEndTime:             domainInfo.FailoverEndTimestamp,
3,096✔
260
                LastUpdatedTime:             domainInfo.GetLastUpdatedTimestamp(),
3,096✔
261
        }, nil
3,096✔
262
}
263

264
func (m *sqlDomainStore) UpdateDomain(
265
        ctx context.Context,
266
        request *persistence.InternalUpdateDomainRequest,
267
) error {
×
268

×
269
        clusters := make([]string, len(request.ReplicationConfig.Clusters))
×
270
        for i := range clusters {
×
271
                clusters[i] = request.ReplicationConfig.Clusters[i].ClusterName
×
272
        }
×
273

274
        var badBinaries []byte
×
275
        badBinariesEncoding := string(common.EncodingTypeEmpty)
×
276
        if request.Config.BadBinaries != nil {
×
277
                badBinaries = request.Config.BadBinaries.Data
×
278
                badBinariesEncoding = string(request.Config.BadBinaries.GetEncoding())
×
279
        }
×
280

281
        domainInfo := &serialization.DomainInfo{
×
282
                Status:                      int32(request.Info.Status),
×
283
                Description:                 request.Info.Description,
×
284
                Owner:                       request.Info.OwnerEmail,
×
285
                Data:                        request.Info.Data,
×
286
                Retention:                   request.Config.Retention,
×
287
                EmitMetric:                  request.Config.EmitMetric,
×
288
                ArchivalBucket:              request.Config.ArchivalBucket,
×
289
                ArchivalStatus:              int16(request.Config.ArchivalStatus),
×
290
                HistoryArchivalStatus:       int16(request.Config.HistoryArchivalStatus),
×
291
                HistoryArchivalURI:          request.Config.HistoryArchivalURI,
×
292
                VisibilityArchivalStatus:    int16(request.Config.VisibilityArchivalStatus),
×
293
                VisibilityArchivalURI:       request.Config.VisibilityArchivalURI,
×
294
                ActiveClusterName:           request.ReplicationConfig.ActiveClusterName,
×
295
                Clusters:                    clusters,
×
296
                ConfigVersion:               request.ConfigVersion,
×
297
                FailoverVersion:             request.FailoverVersion,
×
298
                NotificationVersion:         request.NotificationVersion,
×
299
                FailoverNotificationVersion: request.FailoverNotificationVersion,
×
300
                PreviousFailoverVersion:     request.PreviousFailoverVersion,
×
301
                FailoverEndTimestamp:        request.FailoverEndTime,
×
302
                LastUpdatedTimestamp:        request.LastUpdatedTime,
×
303
                BadBinaries:                 badBinaries,
×
304
                BadBinariesEncoding:         badBinariesEncoding,
×
305
        }
×
306

×
307
        blob, err := m.parser.DomainInfoToBlob(domainInfo)
×
308
        if err != nil {
×
309
                return err
×
310
        }
×
311

312
        return m.txExecute(ctx, sqlplugin.DbDefaultShard, "UpdateDomain", func(tx sqlplugin.Tx) error {
×
313
                result, err := tx.UpdateDomain(ctx, &sqlplugin.DomainRow{
×
314
                        Name:         request.Info.Name,
×
315
                        ID:           serialization.MustParseUUID(request.Info.ID),
×
316
                        Data:         blob.Data,
×
317
                        DataEncoding: string(blob.Encoding),
×
318
                })
×
319
                if err != nil {
×
320
                        return err
×
321
                }
×
322
                noRowsAffected, err := result.RowsAffected()
×
323
                if err != nil {
×
324
                        return fmt.Errorf("rowsAffected error: %v", err)
×
325
                }
×
326
                if noRowsAffected != 1 {
×
327
                        return fmt.Errorf("%v rows updated instead of one", noRowsAffected)
×
328
                }
×
329
                if err := lockMetadata(ctx, tx); err != nil {
×
330
                        return err
×
331
                }
×
332
                return updateMetadata(ctx, tx, request.NotificationVersion)
×
333
        })
334
}
335

336
func (m *sqlDomainStore) DeleteDomain(
337
        ctx context.Context,
338
        request *persistence.DeleteDomainRequest,
339
) error {
×
340
        return m.txExecute(ctx, sqlplugin.DbDefaultShard, "DeleteDomain", func(tx sqlplugin.Tx) error {
×
341
                _, err := tx.DeleteFromDomain(ctx, &sqlplugin.DomainFilter{ID: serialization.UUIDPtr(serialization.MustParseUUID(request.ID))})
×
342
                return err
×
343
        })
×
344
}
345

346
func (m *sqlDomainStore) DeleteDomainByName(
347
        ctx context.Context,
348
        request *persistence.DeleteDomainByNameRequest,
349
) error {
×
350
        return m.txExecute(ctx, sqlplugin.DbDefaultShard, "DeleteDomainByName", func(tx sqlplugin.Tx) error {
×
351
                _, err := tx.DeleteFromDomain(ctx, &sqlplugin.DomainFilter{Name: &request.Name})
×
352
                return err
×
353
        })
×
354
}
355

356
func (m *sqlDomainStore) GetMetadata(
357
        ctx context.Context,
358
) (*persistence.GetMetadataResponse, error) {
666✔
359
        row, err := m.db.SelectFromDomainMetadata(ctx)
666✔
360
        if err != nil {
666✔
361
                return nil, convertCommonErrors(m.db, "GetMetadata", "", err)
×
362
        }
×
363
        return &persistence.GetMetadataResponse{NotificationVersion: row.NotificationVersion}, nil
666✔
364
}
365

366
func (m *sqlDomainStore) ListDomains(
367
        ctx context.Context,
368
        request *persistence.ListDomainsRequest,
369
) (*persistence.InternalListDomainsResponse, error) {
626✔
370
        var pageToken *serialization.UUID
626✔
371
        if request.NextPageToken != nil {
626✔
372
                token := serialization.UUID(request.NextPageToken)
×
373
                pageToken = &token
×
374
        }
×
375
        rows, err := m.db.SelectFromDomain(ctx, &sqlplugin.DomainFilter{
626✔
376
                GreaterThanID: pageToken,
626✔
377
                PageSize:      &request.PageSize,
626✔
378
        })
626✔
379
        if err != nil {
626✔
380
                if err == sql.ErrNoRows {
×
381
                        return &persistence.InternalListDomainsResponse{}, nil
×
382
                }
×
383
                return nil, convertCommonErrors(m.db, "ListDomains", "Failed to get domain rows.", err)
×
384
        }
385

386
        var domains []*persistence.InternalGetDomainResponse
626✔
387
        for _, row := range rows {
3,636✔
388
                resp, err := m.domainRowToGetDomainResponse(&row)
3,010✔
389
                if err != nil {
3,010✔
390
                        return nil, err
×
391
                }
×
392
                domains = append(domains, resp)
3,010✔
393
        }
394

395
        resp := &persistence.InternalListDomainsResponse{Domains: domains}
626✔
396
        if len(rows) >= request.PageSize {
626✔
397
                resp.NextPageToken = rows[len(rows)-1].ID
×
398
        }
×
399

400
        return resp, nil
626✔
401
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc