• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018802e6-6913-48bd-a84d-dbf07b294aa6

10 May 2023 12:02AM UTC coverage: 57.212% (+0.06%) from 57.152%
018802e6-6913-48bd-a84d-dbf07b294aa6

push

buildkite

GitHub
Adds isolation groups to persistence (#5270)

160 of 160 new or added lines in 8 files covered. (100.0%)

86182 of 150636 relevant lines covered (57.21%)

2426.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.52
/common/persistence/sql/sqlDomainStore.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/cluster"
30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/persistence/serialization"
33
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
type sqlDomainStore struct {
38
        sqlStore
39
        activeClusterName string
40
}
41

42
// newMetadataPersistenceV2 creates an instance of sqlDomainStore
43
func newMetadataPersistenceV2(
44
        db sqlplugin.DB,
45
        currentClusterName string,
46
        logger log.Logger,
47
        parser serialization.Parser,
48
) (persistence.DomainStore, error) {
34✔
49
        return &sqlDomainStore{
34✔
50
                sqlStore: sqlStore{
34✔
51
                        db:     db,
34✔
52
                        logger: logger,
34✔
53
                        parser: parser,
34✔
54
                },
34✔
55
                activeClusterName: currentClusterName,
34✔
56
        }, nil
34✔
57
}
34✔
58

59
func updateMetadata(ctx context.Context, tx sqlplugin.Tx, oldNotificationVersion int64) error {
42✔
60
        result, err := tx.UpdateDomainMetadata(ctx, &sqlplugin.DomainMetadataRow{NotificationVersion: oldNotificationVersion})
42✔
61
        if err != nil {
42✔
62
                return convertCommonErrors(tx, "updateDomainMetadata", "", err)
×
63
        }
×
64

65
        rowsAffected, err := result.RowsAffected()
42✔
66
        if err != nil {
42✔
67
                return &types.InternalServiceError{
×
68
                        Message: fmt.Sprintf("Could not verify whether domain metadata update occurred. Error: %v", err),
×
69
                }
×
70
        } else if rowsAffected != 1 {
42✔
71
                return &types.InternalServiceError{
×
72
                        Message: "Failed to update domain metadata. <>1 rows affected.",
×
73
                }
×
74
        }
×
75

76
        return nil
42✔
77
}
78

79
func lockMetadata(ctx context.Context, tx sqlplugin.Tx) error {
42✔
80
        err := tx.LockDomainMetadata(ctx)
42✔
81
        if err != nil {
42✔
82
                return convertCommonErrors(tx, "lockDomainMetadata", "", err)
×
83
        }
×
84
        return nil
42✔
85
}
86

87
func (m *sqlDomainStore) CreateDomain(
88
        ctx context.Context,
89
        request *persistence.InternalCreateDomainRequest,
90
) (*persistence.CreateDomainResponse, error) {
42✔
91
        metadata, err := m.GetMetadata(ctx)
42✔
92
        if err != nil {
42✔
93
                return nil, err
×
94
        }
×
95

96
        clusters := make([]string, len(request.ReplicationConfig.Clusters))
42✔
97
        for i := range clusters {
76✔
98
                clusters[i] = request.ReplicationConfig.Clusters[i].ClusterName
34✔
99
        }
34✔
100

101
        var badBinaries []byte
42✔
102
        badBinariesEncoding := string(common.EncodingTypeEmpty)
42✔
103
        if request.Config.BadBinaries != nil {
84✔
104
                badBinaries = request.Config.BadBinaries.Data
42✔
105
                badBinariesEncoding = string(request.Config.BadBinaries.GetEncoding())
42✔
106
        }
42✔
107

108
        var isolationGroups []byte
42✔
109
        var isolationGroupsEncoding string
42✔
110
        if request.Config != nil && request.Config.IsolationGroups != nil {
84✔
111
                isolationGroups = request.Config.IsolationGroups.GetData()
42✔
112
                isolationGroupsEncoding = request.Config.IsolationGroups.GetEncodingString()
42✔
113
        }
42✔
114

115
        domainInfo := &serialization.DomainInfo{
42✔
116
                Name:                        request.Info.Name,
42✔
117
                Status:                      int32(request.Info.Status),
42✔
118
                Description:                 request.Info.Description,
42✔
119
                Owner:                       request.Info.OwnerEmail,
42✔
120
                Data:                        request.Info.Data,
42✔
121
                Retention:                   request.Config.Retention,
42✔
122
                EmitMetric:                  request.Config.EmitMetric,
42✔
123
                ArchivalBucket:              request.Config.ArchivalBucket,
42✔
124
                ArchivalStatus:              int16(request.Config.ArchivalStatus),
42✔
125
                HistoryArchivalStatus:       int16(request.Config.HistoryArchivalStatus),
42✔
126
                HistoryArchivalURI:          request.Config.HistoryArchivalURI,
42✔
127
                VisibilityArchivalStatus:    int16(request.Config.VisibilityArchivalStatus),
42✔
128
                VisibilityArchivalURI:       request.Config.VisibilityArchivalURI,
42✔
129
                ActiveClusterName:           request.ReplicationConfig.ActiveClusterName,
42✔
130
                Clusters:                    clusters,
42✔
131
                ConfigVersion:               request.ConfigVersion,
42✔
132
                FailoverVersion:             request.FailoverVersion,
42✔
133
                NotificationVersion:         metadata.NotificationVersion,
42✔
134
                FailoverNotificationVersion: persistence.InitialFailoverNotificationVersion,
42✔
135
                PreviousFailoverVersion:     common.InitialPreviousFailoverVersion,
42✔
136
                LastUpdatedTimestamp:        request.LastUpdatedTime,
42✔
137
                BadBinaries:                 badBinaries,
42✔
138
                BadBinariesEncoding:         badBinariesEncoding,
42✔
139
                IsolationGroups:             isolationGroups,
42✔
140
                IsolationGroupsEncoding:     isolationGroupsEncoding,
42✔
141
        }
42✔
142

42✔
143
        blob, err := m.parser.DomainInfoToBlob(domainInfo)
42✔
144
        if err != nil {
42✔
145
                return nil, err
×
146
        }
×
147

148
        var resp *persistence.CreateDomainResponse
42✔
149
        err = m.txExecute(ctx, sqlplugin.DbDefaultShard, "CreateDomain", func(tx sqlplugin.Tx) error {
84✔
150
                if _, err1 := tx.InsertIntoDomain(ctx, &sqlplugin.DomainRow{
42✔
151
                        Name:         request.Info.Name,
42✔
152
                        ID:           serialization.MustParseUUID(request.Info.ID),
42✔
153
                        Data:         blob.Data,
42✔
154
                        DataEncoding: string(blob.Encoding),
42✔
155
                        IsGlobal:     request.IsGlobalDomain,
42✔
156
                }); err1 != nil {
42✔
157
                        if m.db.IsDupEntryError(err1) {
×
158
                                return &types.DomainAlreadyExistsError{
×
159
                                        Message: fmt.Sprintf("name: %v", request.Info.Name),
×
160
                                }
×
161
                        }
×
162
                        return err1
×
163
                }
164
                if err1 := lockMetadata(ctx, tx); err1 != nil {
42✔
165
                        return err1
×
166
                }
×
167
                if err1 := updateMetadata(ctx, tx, metadata.NotificationVersion); err1 != nil {
42✔
168
                        return err1
×
169
                }
×
170
                resp = &persistence.CreateDomainResponse{ID: request.Info.ID}
42✔
171
                return nil
42✔
172
        })
173
        return resp, err
42✔
174
}
175

176
func (m *sqlDomainStore) GetDomain(
177
        ctx context.Context,
178
        request *persistence.GetDomainRequest,
179
) (*persistence.InternalGetDomainResponse, error) {
523✔
180
        filter := &sqlplugin.DomainFilter{}
523✔
181
        switch {
523✔
182
        case request.Name != "" && request.ID != "":
×
183
                return nil, &types.BadRequestError{
×
184
                        Message: "GetDomain operation failed.  Both ID and Name specified in request.",
×
185
                }
×
186
        case request.Name != "":
523✔
187
                filter.Name = &request.Name
523✔
188
        case request.ID != "":
×
189
                filter.ID = serialization.UUIDPtr(serialization.MustParseUUID(request.ID))
×
190
        default:
×
191
                return nil, &types.BadRequestError{
×
192
                        Message: "GetDomain operation failed.  Both ID and Name are empty.",
×
193
                }
×
194
        }
195

196
        rows, err := m.db.SelectFromDomain(ctx, filter)
523✔
197
        if err != nil {
960✔
198
                switch err {
437✔
199
                case sql.ErrNoRows:
437✔
200
                        // We did not return in the above for-loop because there were no rows.
437✔
201
                        identity := request.Name
437✔
202
                        if len(request.ID) > 0 {
437✔
203
                                identity = request.ID
×
204
                        }
×
205

206
                        return nil, &types.EntityNotExistsError{
437✔
207
                                Message: fmt.Sprintf("Domain %s does not exist.", identity),
437✔
208
                        }
437✔
209
                default:
×
210
                        return nil, convertCommonErrors(m.db, "GetDomain", "", err)
×
211
                }
212
        }
213

214
        response, err := m.domainRowToGetDomainResponse(&rows[0])
88✔
215
        if err != nil {
88✔
216
                return nil, err
×
217
        }
×
218

219
        return response, nil
88✔
220
}
221

222
func (m *sqlDomainStore) domainRowToGetDomainResponse(row *sqlplugin.DomainRow) (*persistence.InternalGetDomainResponse, error) {
3,096✔
223
        domainInfo, err := m.parser.DomainInfoFromBlob(row.Data, row.DataEncoding)
3,096✔
224
        if err != nil {
3,096✔
225
                return nil, err
×
226
        }
×
227

228
        clusters := make([]*persistence.ClusterReplicationConfig, len(domainInfo.Clusters))
3,096✔
229
        for i := range domainInfo.Clusters {
5,562✔
230
                clusters[i] = &persistence.ClusterReplicationConfig{ClusterName: domainInfo.Clusters[i]}
2,466✔
231
        }
2,466✔
232

233
        var badBinaries *persistence.DataBlob
3,096✔
234
        if domainInfo.BadBinaries != nil {
6,192✔
235
                badBinaries = persistence.NewDataBlob(domainInfo.BadBinaries, common.EncodingType(domainInfo.GetBadBinariesEncoding()))
3,096✔
236
        }
3,096✔
237

238
        var isolationGroups *persistence.DataBlob
3,096✔
239
        if domainInfo.IsolationGroups != nil {
6,192✔
240
                isolationGroups = persistence.NewDataBlob(domainInfo.IsolationGroups, common.EncodingType(domainInfo.IsolationGroupsEncoding))
3,096✔
241
        }
3,096✔
242

243
        return &persistence.InternalGetDomainResponse{
3,096✔
244
                Info: &persistence.DomainInfo{
3,096✔
245
                        ID:          row.ID.String(),
3,096✔
246
                        Name:        row.Name,
3,096✔
247
                        Status:      int(domainInfo.GetStatus()),
3,096✔
248
                        Description: domainInfo.GetDescription(),
3,096✔
249
                        OwnerEmail:  domainInfo.GetOwner(),
3,096✔
250
                        Data:        domainInfo.GetData(),
3,096✔
251
                },
3,096✔
252
                Config: &persistence.InternalDomainConfig{
3,096✔
253
                        Retention:                domainInfo.GetRetention(),
3,096✔
254
                        EmitMetric:               domainInfo.GetEmitMetric(),
3,096✔
255
                        ArchivalBucket:           domainInfo.GetArchivalBucket(),
3,096✔
256
                        ArchivalStatus:           types.ArchivalStatus(domainInfo.GetArchivalStatus()),
3,096✔
257
                        HistoryArchivalStatus:    types.ArchivalStatus(domainInfo.GetHistoryArchivalStatus()),
3,096✔
258
                        HistoryArchivalURI:       domainInfo.GetHistoryArchivalURI(),
3,096✔
259
                        VisibilityArchivalStatus: types.ArchivalStatus(domainInfo.GetVisibilityArchivalStatus()),
3,096✔
260
                        VisibilityArchivalURI:    domainInfo.GetVisibilityArchivalURI(),
3,096✔
261
                        BadBinaries:              badBinaries,
3,096✔
262
                        IsolationGroups:          isolationGroups,
3,096✔
263
                },
3,096✔
264
                ReplicationConfig: &persistence.DomainReplicationConfig{
3,096✔
265
                        ActiveClusterName: cluster.GetOrUseDefaultActiveCluster(m.activeClusterName, domainInfo.GetActiveClusterName()),
3,096✔
266
                        Clusters:          cluster.GetOrUseDefaultClusters(m.activeClusterName, clusters),
3,096✔
267
                },
3,096✔
268
                IsGlobalDomain:              row.IsGlobal,
3,096✔
269
                FailoverVersion:             domainInfo.GetFailoverVersion(),
3,096✔
270
                ConfigVersion:               domainInfo.GetConfigVersion(),
3,096✔
271
                NotificationVersion:         domainInfo.GetNotificationVersion(),
3,096✔
272
                FailoverNotificationVersion: domainInfo.GetFailoverNotificationVersion(),
3,096✔
273
                PreviousFailoverVersion:     domainInfo.GetPreviousFailoverVersion(),
3,096✔
274
                FailoverEndTime:             domainInfo.FailoverEndTimestamp,
3,096✔
275
                LastUpdatedTime:             domainInfo.GetLastUpdatedTimestamp(),
3,096✔
276
        }, nil
3,096✔
277
}
278

279
func (m *sqlDomainStore) UpdateDomain(
280
        ctx context.Context,
281
        request *persistence.InternalUpdateDomainRequest,
282
) error {
×
283

×
284
        clusters := make([]string, len(request.ReplicationConfig.Clusters))
×
285
        for i := range clusters {
×
286
                clusters[i] = request.ReplicationConfig.Clusters[i].ClusterName
×
287
        }
×
288

289
        var badBinaries []byte
×
290
        badBinariesEncoding := string(common.EncodingTypeEmpty)
×
291
        if request.Config.BadBinaries != nil {
×
292
                badBinaries = request.Config.BadBinaries.Data
×
293
                badBinariesEncoding = string(request.Config.BadBinaries.GetEncoding())
×
294
        }
×
295

296
        var isolationGroups []byte
×
297
        isolationGroupsEncoding := string(common.EncodingTypeEmpty)
×
298
        if request.Config.IsolationGroups != nil {
×
299
                isolationGroups = request.Config.IsolationGroups.Data
×
300
                isolationGroupsEncoding = request.Config.IsolationGroups.GetEncodingString()
×
301
        }
×
302

303
        domainInfo := &serialization.DomainInfo{
×
304
                Status:                      int32(request.Info.Status),
×
305
                Description:                 request.Info.Description,
×
306
                Owner:                       request.Info.OwnerEmail,
×
307
                Data:                        request.Info.Data,
×
308
                Retention:                   request.Config.Retention,
×
309
                EmitMetric:                  request.Config.EmitMetric,
×
310
                ArchivalBucket:              request.Config.ArchivalBucket,
×
311
                ArchivalStatus:              int16(request.Config.ArchivalStatus),
×
312
                HistoryArchivalStatus:       int16(request.Config.HistoryArchivalStatus),
×
313
                HistoryArchivalURI:          request.Config.HistoryArchivalURI,
×
314
                VisibilityArchivalStatus:    int16(request.Config.VisibilityArchivalStatus),
×
315
                VisibilityArchivalURI:       request.Config.VisibilityArchivalURI,
×
316
                ActiveClusterName:           request.ReplicationConfig.ActiveClusterName,
×
317
                Clusters:                    clusters,
×
318
                ConfigVersion:               request.ConfigVersion,
×
319
                FailoverVersion:             request.FailoverVersion,
×
320
                NotificationVersion:         request.NotificationVersion,
×
321
                FailoverNotificationVersion: request.FailoverNotificationVersion,
×
322
                PreviousFailoverVersion:     request.PreviousFailoverVersion,
×
323
                FailoverEndTimestamp:        request.FailoverEndTime,
×
324
                LastUpdatedTimestamp:        request.LastUpdatedTime,
×
325
                BadBinaries:                 badBinaries,
×
326
                BadBinariesEncoding:         badBinariesEncoding,
×
327
                IsolationGroups:             isolationGroups,
×
328
                IsolationGroupsEncoding:     isolationGroupsEncoding,
×
329
        }
×
330

×
331
        blob, err := m.parser.DomainInfoToBlob(domainInfo)
×
332
        if err != nil {
×
333
                return err
×
334
        }
×
335

336
        return m.txExecute(ctx, sqlplugin.DbDefaultShard, "UpdateDomain", func(tx sqlplugin.Tx) error {
×
337
                result, err := tx.UpdateDomain(ctx, &sqlplugin.DomainRow{
×
338
                        Name:         request.Info.Name,
×
339
                        ID:           serialization.MustParseUUID(request.Info.ID),
×
340
                        Data:         blob.Data,
×
341
                        DataEncoding: string(blob.Encoding),
×
342
                })
×
343
                if err != nil {
×
344
                        return err
×
345
                }
×
346
                noRowsAffected, err := result.RowsAffected()
×
347
                if err != nil {
×
348
                        return fmt.Errorf("rowsAffected error: %v", err)
×
349
                }
×
350
                if noRowsAffected != 1 {
×
351
                        return fmt.Errorf("%v rows updated instead of one", noRowsAffected)
×
352
                }
×
353
                if err := lockMetadata(ctx, tx); err != nil {
×
354
                        return err
×
355
                }
×
356
                return updateMetadata(ctx, tx, request.NotificationVersion)
×
357
        })
358
}
359

360
func (m *sqlDomainStore) DeleteDomain(
361
        ctx context.Context,
362
        request *persistence.DeleteDomainRequest,
363
) error {
×
364
        return m.txExecute(ctx, sqlplugin.DbDefaultShard, "DeleteDomain", func(tx sqlplugin.Tx) error {
×
365
                _, err := tx.DeleteFromDomain(ctx, &sqlplugin.DomainFilter{ID: serialization.UUIDPtr(serialization.MustParseUUID(request.ID))})
×
366
                return err
×
367
        })
×
368
}
369

370
func (m *sqlDomainStore) DeleteDomainByName(
371
        ctx context.Context,
372
        request *persistence.DeleteDomainByNameRequest,
373
) error {
×
374
        return m.txExecute(ctx, sqlplugin.DbDefaultShard, "DeleteDomainByName", func(tx sqlplugin.Tx) error {
×
375
                _, err := tx.DeleteFromDomain(ctx, &sqlplugin.DomainFilter{Name: &request.Name})
×
376
                return err
×
377
        })
×
378
}
379

380
func (m *sqlDomainStore) GetMetadata(
381
        ctx context.Context,
382
) (*persistence.GetMetadataResponse, error) {
666✔
383
        row, err := m.db.SelectFromDomainMetadata(ctx)
666✔
384
        if err != nil {
666✔
385
                return nil, convertCommonErrors(m.db, "GetMetadata", "", err)
×
386
        }
×
387
        return &persistence.GetMetadataResponse{NotificationVersion: row.NotificationVersion}, nil
666✔
388
}
389

390
func (m *sqlDomainStore) ListDomains(
391
        ctx context.Context,
392
        request *persistence.ListDomainsRequest,
393
) (*persistence.InternalListDomainsResponse, error) {
626✔
394
        var pageToken *serialization.UUID
626✔
395
        if request.NextPageToken != nil {
626✔
396
                token := serialization.UUID(request.NextPageToken)
×
397
                pageToken = &token
×
398
        }
×
399
        rows, err := m.db.SelectFromDomain(ctx, &sqlplugin.DomainFilter{
626✔
400
                GreaterThanID: pageToken,
626✔
401
                PageSize:      &request.PageSize,
626✔
402
        })
626✔
403
        if err != nil {
626✔
404
                if err == sql.ErrNoRows {
×
405
                        return &persistence.InternalListDomainsResponse{}, nil
×
406
                }
×
407
                return nil, convertCommonErrors(m.db, "ListDomains", "Failed to get domain rows.", err)
×
408
        }
409

410
        var domains []*persistence.InternalGetDomainResponse
626✔
411
        for _, row := range rows {
3,636✔
412
                resp, err := m.domainRowToGetDomainResponse(&row)
3,010✔
413
                if err != nil {
3,010✔
414
                        return nil, err
×
415
                }
×
416
                domains = append(domains, resp)
3,010✔
417
        }
418

419
        resp := &persistence.InternalListDomainsResponse{Domains: domains}
626✔
420
        if len(rows) >= request.PageSize {
626✔
421
                resp.NextPageToken = rows[len(rows)-1].ID
×
422
        }
×
423

424
        return resp, nil
626✔
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc