• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 13540210091

26 Feb 2025 09:04AM UTC coverage: 45.512% (+0.06%) from 45.452%
13540210091

Pull #2868

github

web-flow
Merge ad75935a9 into 2a4be1cb3
Pull Request #2868: do not remove publications of slot defined in manifest

1 of 20 new or added lines in 3 files covered. (5.0%)

28 existing lines in 1 file now uncovered.

7023 of 15431 relevant lines covered (45.51%)

29.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.74
/pkg/cluster/streams.go
1
package cluster
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "reflect"
8
        "sort"
9
        "strings"
10

11
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
12
        zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"
13
        "github.com/zalando/postgres-operator/pkg/util"
14
        "github.com/zalando/postgres-operator/pkg/util/constants"
15
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/types"
18
)
19

20
func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) {
3✔
21
        c.setProcessName("creating streams")
3✔
22

3✔
23
        fes := c.generateFabricEventStream(appId)
3✔
24
        streamCRD, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
3✔
25
        if err != nil {
3✔
26
                return nil, err
×
27
        }
×
28

29
        return streamCRD, nil
3✔
30
}
31

32
func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) (patchedStream *zalandov1.FabricEventStream, err error) {
5✔
33
        c.setProcessName("updating event streams")
5✔
34

5✔
35
        patch, err := json.Marshal(newEventStreams)
5✔
36
        if err != nil {
5✔
37
                return nil, fmt.Errorf("could not marshal new event stream CRD %q: %v", newEventStreams.Name, err)
×
38
        }
×
39
        if patchedStream, err = c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Patch(
5✔
40
                context.TODO(), newEventStreams.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
5✔
41
                return nil, err
×
42
        }
×
43

44
        return patchedStream, nil
5✔
45
}
46

47
func (c *Cluster) deleteStream(appId string) error {
2✔
48
        c.setProcessName("deleting event stream")
2✔
49
        c.logger.Debugf("deleting event stream with applicationId %s", appId)
2✔
50

2✔
51
        err := c.KubeClient.FabricEventStreams(c.Streams[appId].Namespace).Delete(context.TODO(), c.Streams[appId].Name, metav1.DeleteOptions{})
2✔
52
        if err != nil {
2✔
53
                return fmt.Errorf("could not delete event stream %q with applicationId %s: %v", c.Streams[appId].Name, appId, err)
×
54
        }
×
55
        c.logger.Infof("event stream %q with applicationId %s has been successfully deleted", c.Streams[appId].Name, appId)
2✔
56
        delete(c.Streams, appId)
2✔
57

2✔
58
        return nil
2✔
59
}
60

61
func (c *Cluster) deleteStreams() error {
1✔
62
        // check if stream CRD is installed before trying a delete
1✔
63
        _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
1✔
64
        if k8sutil.ResourceNotFound(err) {
1✔
65
                return nil
×
66
        }
×
67
        c.setProcessName("deleting event streams")
1✔
68
        errors := make([]string, 0)
1✔
69

1✔
70
        for appId := range c.Streams {
2✔
71
                err := c.deleteStream(appId)
1✔
72
                if err != nil {
1✔
73
                        errors = append(errors, fmt.Sprintf("%v", err))
×
74
                }
×
75
        }
76

77
        if len(errors) > 0 {
1✔
78
                return fmt.Errorf("could not delete all event stream custom resources: %v", strings.Join(errors, `', '`))
×
79
        }
×
80

81
        return nil
1✔
82
}
83

84
func getDistinctApplicationIds(streams []acidv1.Stream) []string {
2✔
85
        appIds := make([]string, 0)
2✔
86
        for _, stream := range streams {
3✔
87
                if !util.SliceContains(appIds, stream.ApplicationId) {
2✔
88
                        appIds = append(appIds, stream.ApplicationId)
1✔
89
                }
1✔
90
        }
91

92
        return appIds
2✔
93
}
94

95
func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]zalandov1.Slot, slotsToSync *map[string]map[string]string) error {
×
96
        createPublications := make(map[string]string)
×
97
        alterPublications := make(map[string]string)
×
98
        deletePublications := []string{}
×
99

×
100
        defer func() {
×
101
                if err := c.closeDbConn(); err != nil {
×
102
                        c.logger.Errorf("could not close database connection: %v", err)
×
103
                }
×
104
        }()
105

106
        // check for existing publications
107
        if err := c.initDbConnWithName(dbName); err != nil {
×
108
                return fmt.Errorf("could not init database connection: %v", err)
×
109
        }
×
110

111
        currentPublications, err := c.getPublications()
×
112
        if err != nil {
×
113
                return fmt.Errorf("could not get current publications: %v", err)
×
114
        }
×
115

116
        for slotName, slotAndPublication := range databaseSlotsList {
×
NEW
117
                newTables := slotAndPublication.Publication
×
NEW
118
                tableNames := make([]string, len(newTables))
×
119
                i := 0
×
NEW
120
                for t := range newTables {
×
121
                        tableName, schemaName := getTableSchema(t)
×
122
                        tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
×
123
                        i++
×
124
                }
×
125
                sort.Strings(tableNames)
×
126
                tableList := strings.Join(tableNames, ", ")
×
127

×
128
                currentTables, exists := currentPublications[slotName]
×
NEW
129
                // if newTables is empty it means that it's definition was removed from streams section
×
NEW
130
                // but when slot is defined in manifest we should sync publications, too
×
NEW
131
                // by reusing current tables we make sure it is not
×
NEW
132
                if len(newTables) == 0 {
×
NEW
133
                        tableList = currentTables
×
NEW
134
                }
×
135
                if !exists {
×
136
                        createPublications[slotName] = tableList
×
137
                } else if currentTables != tableList {
×
138
                        alterPublications[slotName] = tableList
×
139
                } else {
×
140
                        (*slotsToSync)[slotName] = slotAndPublication.Slot
×
141
                }
×
142
        }
143

144
        // check if there is any deletion
145
        for slotName := range currentPublications {
×
146
                if _, exists := databaseSlotsList[slotName]; !exists {
×
147
                        deletePublications = append(deletePublications, slotName)
×
148
                }
×
149
        }
150

151
        if len(createPublications)+len(alterPublications)+len(deletePublications) == 0 {
×
152
                return nil
×
153
        }
×
154

155
        errors := make([]string, 0)
×
156
        for publicationName, tables := range createPublications {
×
157
                if err = c.executeCreatePublication(publicationName, tables); err != nil {
×
158
                        errors = append(errors, fmt.Sprintf("creation of publication %q failed: %v", publicationName, err))
×
159
                        continue
×
160
                }
161
                (*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
×
162
        }
163
        for publicationName, tables := range alterPublications {
×
164
                if err = c.executeAlterPublication(publicationName, tables); err != nil {
×
165
                        errors = append(errors, fmt.Sprintf("update of publication %q failed: %v", publicationName, err))
×
166
                        continue
×
167
                }
168
                (*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
×
169
        }
170
        for _, publicationName := range deletePublications {
×
171
                if err = c.executeDropPublication(publicationName); err != nil {
×
172
                        errors = append(errors, fmt.Sprintf("deletion of publication %q failed: %v", publicationName, err))
×
173
                        continue
×
174
                }
175
                (*slotsToSync)[publicationName] = nil
×
176
        }
177

178
        if len(errors) > 0 {
×
179
                return fmt.Errorf("%v", strings.Join(errors, `', '`))
×
180
        }
×
181

182
        return nil
×
183
}
184

185
func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
18✔
186
        eventStreams := make([]zalandov1.EventStream, 0)
18✔
187
        resourceAnnotations := map[string]string{}
18✔
188
        var err, err2 error
18✔
189

18✔
190
        for _, stream := range c.Spec.Streams {
36✔
191
                if stream.ApplicationId != appId {
18✔
192
                        continue
×
193
                }
194

195
                err = setResourceAnnotation(&resourceAnnotations, stream.CPU, constants.EventStreamCpuAnnotationKey)
18✔
196
                err2 = setResourceAnnotation(&resourceAnnotations, stream.Memory, constants.EventStreamMemoryAnnotationKey)
18✔
197
                if err != nil || err2 != nil {
18✔
198
                        c.logger.Warningf("could not set resource annotation for event stream: %v", err)
×
199
                }
×
200

201
                for tableName, table := range stream.Tables {
72✔
202
                        streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
54✔
203
                        streamFlow := getEventStreamFlow(table.PayloadColumn)
54✔
204
                        streamSink := getEventStreamSink(stream, table.EventType)
54✔
205
                        streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType, table.IgnoreRecovery)
54✔
206

54✔
207
                        eventStreams = append(eventStreams, zalandov1.EventStream{
54✔
208
                                EventStreamFlow:     streamFlow,
54✔
209
                                EventStreamRecovery: streamRecovery,
54✔
210
                                EventStreamSink:     streamSink,
54✔
211
                                EventStreamSource:   streamSource})
54✔
212
                }
54✔
213
        }
214

215
        return &zalandov1.FabricEventStream{
18✔
216
                TypeMeta: metav1.TypeMeta{
18✔
217
                        APIVersion: constants.EventStreamCRDApiVersion,
18✔
218
                        Kind:       constants.EventStreamCRDKind,
18✔
219
                },
18✔
220
                ObjectMeta: metav1.ObjectMeta{
18✔
221
                        // max length for cluster name is 58 so we can only add 5 more characters / numbers
18✔
222
                        Name:            fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))),
18✔
223
                        Namespace:       c.Namespace,
18✔
224
                        Labels:          c.labelsSet(true),
18✔
225
                        Annotations:     c.AnnotationsToPropagate(c.annotationsSet(resourceAnnotations)),
18✔
226
                        OwnerReferences: c.ownerReferences(),
18✔
227
                },
18✔
228
                Spec: zalandov1.FabricEventStreamSpec{
18✔
229
                        ApplicationId: appId,
18✔
230
                        EventStreams:  eventStreams,
18✔
231
                },
18✔
232
        }
18✔
233
}
234

235
func setResourceAnnotation(annotations *map[string]string, resource *string, key string) error {
36✔
236
        var (
36✔
237
                isSmaller bool
36✔
238
                err       error
36✔
239
        )
36✔
240
        if resource != nil {
54✔
241
                currentValue, exists := (*annotations)[key]
18✔
242
                if exists {
18✔
243
                        isSmaller, err = util.IsSmallerQuantity(currentValue, *resource)
×
244
                        if err != nil {
×
245
                                return fmt.Errorf("could not compare resource in %q annotation: %v", key, err)
×
246
                        }
×
247
                }
248
                if isSmaller || !exists {
36✔
249
                        (*annotations)[key] = *resource
18✔
250
                }
18✔
251
        }
252

253
        return nil
36✔
254
}
255

256
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName string, idColumn *string) zalandov1.EventStreamSource {
54✔
257
        table, schema := getTableSchema(tableName)
54✔
258
        streamFilter := stream.Filter[tableName]
54✔
259
        return zalandov1.EventStreamSource{
54✔
260
                Type:             constants.EventStreamSourcePGType,
54✔
261
                Schema:           schema,
54✔
262
                EventStreamTable: getOutboxTable(table, idColumn),
54✔
263
                Filter:           streamFilter,
54✔
264
                Connection: c.getStreamConnection(
54✔
265
                        stream.Database,
54✔
266
                        constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix,
54✔
267
                        stream.ApplicationId),
54✔
268
        }
54✔
269
}
54✔
270

271
func getEventStreamFlow(payloadColumn *string) zalandov1.EventStreamFlow {
54✔
272
        return zalandov1.EventStreamFlow{
54✔
273
                Type:          constants.EventStreamFlowPgGenericType,
54✔
274
                PayloadColumn: payloadColumn,
54✔
275
        }
54✔
276
}
54✔
277

278
func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink {
54✔
279
        return zalandov1.EventStreamSink{
54✔
280
                Type:         constants.EventStreamSinkNakadiType,
54✔
281
                EventType:    eventType,
54✔
282
                MaxBatchSize: stream.BatchSize,
54✔
283
        }
54✔
284
}
54✔
285

286
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string, ignoreRecovery *bool) zalandov1.EventStreamRecovery {
54✔
287
        if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
54✔
288
                (stream.EnableRecovery == nil && recoveryEventType == "") {
84✔
289
                return zalandov1.EventStreamRecovery{
30✔
290
                        Type: constants.EventStreamRecoveryNoneType,
30✔
291
                }
30✔
292
        }
30✔
293

294
        if ignoreRecovery != nil && *ignoreRecovery {
32✔
295
                return zalandov1.EventStreamRecovery{
8✔
296
                        Type: constants.EventStreamRecoveryIgnoreType,
8✔
297
                }
8✔
298
        }
8✔
299

300
        if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
24✔
301
                recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
8✔
302
        }
8✔
303

304
        return zalandov1.EventStreamRecovery{
16✔
305
                Type: constants.EventStreamRecoveryDLQType,
16✔
306
                Sink: &zalandov1.EventStreamSink{
16✔
307
                        Type:         constants.EventStreamSinkNakadiType,
16✔
308
                        EventType:    recoveryEventType,
16✔
309
                        MaxBatchSize: stream.BatchSize,
16✔
310
                },
16✔
311
        }
16✔
312
}
313

314
func getTableSchema(fullTableName string) (tableName, schemaName string) {
54✔
315
        schemaName = "public"
54✔
316
        tableName = fullTableName
54✔
317
        if strings.Contains(fullTableName, ".") {
108✔
318
                schemaName = strings.Split(fullTableName, ".")[0]
54✔
319
                tableName = strings.Split(fullTableName, ".")[1]
54✔
320
        }
54✔
321

322
        return tableName, schemaName
54✔
323
}
324

325
func getOutboxTable(tableName string, idColumn *string) zalandov1.EventStreamTable {
54✔
326
        return zalandov1.EventStreamTable{
54✔
327
                Name:     tableName,
54✔
328
                IDColumn: idColumn,
54✔
329
        }
54✔
330
}
54✔
331

332
func getSlotName(dbName, appId string) string {
63✔
333
        return fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))
63✔
334
}
63✔
335

336
func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection {
54✔
337
        return zalandov1.Connection{
54✔
338
                Url:        fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user),
54✔
339
                SlotName:   getSlotName(database, appId),
54✔
340
                PluginType: constants.EventStreamSourcePluginType,
54✔
341
                DBAuth: zalandov1.DBAuth{
54✔
342
                        Type:        constants.EventStreamSourceAuthType,
54✔
343
                        Name:        c.credentialSecretNameForCluster(user, c.Name),
54✔
344
                        UserKey:     "username",
54✔
345
                        PasswordKey: "password",
54✔
346
                },
54✔
347
        }
54✔
348
}
54✔
349

350
func (c *Cluster) syncStreams() error {
×
351
        c.setProcessName("syncing streams")
×
352

×
353
        _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
×
354
        if k8sutil.ResourceNotFound(err) {
×
355
                c.logger.Debug("event stream CRD not installed, skipping")
×
356
                return nil
×
357
        }
×
358

359
        // create map with every database and empty slot defintion
360
        // we need it to detect removal of streams from databases
361
        if err := c.initDbConn(); err != nil {
×
362
                return fmt.Errorf("could not init database connection")
×
363
        }
×
364
        defer func() {
×
365
                if err := c.closeDbConn(); err != nil {
×
366
                        c.logger.Errorf("could not close database connection: %v", err)
×
367
                }
×
368
        }()
369
        listDatabases, err := c.getDatabases()
×
370
        if err != nil {
×
371
                return fmt.Errorf("could not get list of databases: %v", err)
×
372
        }
×
NEW
373
        databaseSlots := make(map[string]map[string]zalandov1.Slot)
×
374
        for dbName := range listDatabases {
×
375
                if dbName != "template0" && dbName != "template1" {
×
376
                        databaseSlots[dbName] = map[string]zalandov1.Slot{}
×
377
                }
×
378
        }
379

380
        // need to take explicitly defined slots into account whey syncing Patroni config
NEW
381
        slotsToSync := make(map[string]map[string]string)
×
NEW
382
        requiredPatroniConfig := c.Spec.Patroni
×
NEW
383
        if len(requiredPatroniConfig.Slots) > 0 {
×
NEW
384
                for slotName, slotConfig := range requiredPatroniConfig.Slots {
×
NEW
385
                        slotsToSync[slotName] = slotConfig
×
NEW
386
                        if _, exists := databaseSlots[slotConfig["database"]]; exists {
×
NEW
387
                                databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{Slot: slotConfig}
×
NEW
388
                        }
×
389
                }
390
        }
391

392
        // get list of required slots and publications, group by database
393
        for _, stream := range c.Spec.Streams {
×
394
                if _, exists := databaseSlots[stream.Database]; !exists {
×
395
                        c.logger.Warningf("database %q does not exist in the cluster", stream.Database)
×
396
                        continue
×
397
                }
398
                slot := map[string]string{
×
399
                        "database": stream.Database,
×
UNCOV
400
                        "plugin":   constants.EventStreamSourcePluginType,
×
401
                        "type":     "logical",
×
402
                }
×
403
                slotName := getSlotName(stream.Database, stream.ApplicationId)
×
404
                if _, exists := databaseSlots[stream.Database][slotName]; !exists {
×
405
                        databaseSlots[stream.Database][slotName] = zalandov1.Slot{
×
406
                                Slot:        slot,
×
407
                                Publication: stream.Tables,
×
408
                        }
×
409
                } else {
×
410
                        slotAndPublication := databaseSlots[stream.Database][slotName]
×
411
                        streamTables := slotAndPublication.Publication
×
412
                        for tableName, table := range stream.Tables {
×
413
                                if _, exists := streamTables[tableName]; !exists {
×
414
                                        streamTables[tableName] = table
×
415
                                }
×
416
                        }
417
                        slotAndPublication.Publication = streamTables
×
418
                        databaseSlots[stream.Database][slotName] = slotAndPublication
×
419
                }
420
        }
421

422
        // sync publication in a database
UNCOV
423
        c.logger.Debug("syncing database publications")
×
UNCOV
424
        for dbName, databaseSlotsList := range databaseSlots {
×
UNCOV
425
                err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync)
×
426
                if err != nil {
×
427
                        c.logger.Warningf("could not sync all publications in database %q: %v", dbName, err)
×
428
                        continue
×
429
                }
430
        }
431

UNCOV
432
        c.logger.Debug("syncing logical replication slots")
×
UNCOV
433
        pods, err := c.listPods()
×
UNCOV
434
        if err != nil {
×
435
                return fmt.Errorf("could not get list of pods to sync logical replication slots via Patroni API: %v", err)
×
436
        }
×
437

438
        // sync logical replication slots in Patroni config
439
        requiredPatroniConfig.Slots = slotsToSync
×
UNCOV
440
        configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil)
×
UNCOV
441
        if err != nil {
×
442
                c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
×
443
        }
×
444

445
        // finally sync stream CRDs
446
        // get distinct application IDs from streams section
447
        // there will be a separate event stream resource for each ID
UNCOV
448
        appIds := getDistinctApplicationIds(c.Spec.Streams)
×
UNCOV
449
        for _, appId := range appIds {
×
UNCOV
450
                if hasSlotsInSync(appId, databaseSlots, slotsToSync) {
×
451
                        if err = c.syncStream(appId); err != nil {
×
452
                                c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
×
453
                        }
×
454
                } else {
×
455
                        c.logger.Warningf("database replication slots %#v for streams with applicationId %s not in sync, skipping event stream sync", slotsToSync, appId)
×
456
                }
×
457
        }
458

459
        // check if there is any deletion
UNCOV
460
        if err = c.cleanupRemovedStreams(appIds); err != nil {
×
UNCOV
461
                return fmt.Errorf("%v", err)
×
UNCOV
462
        }
×
463

464
        return nil
×
465
}
466

467
func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
6✔
468
        allSlotsInSync := true
6✔
469
        for dbName, slots := range databaseSlots {
15✔
470
                for slotName := range slots {
18✔
471
                        if slotName == getSlotName(dbName, appId) {
16✔
472
                                if slot, exists := slotsToSync[slotName]; !exists || slot == nil {
11✔
473
                                        allSlotsInSync = false
4✔
474
                                        continue
4✔
475
                                }
476
                        }
477
                }
478
        }
479

480
        return allSlotsInSync
6✔
481
}
482

483
func (c *Cluster) syncStream(appId string) error {
12✔
484
        var (
12✔
485
                streams *zalandov1.FabricEventStreamList
12✔
486
                err     error
12✔
487
        )
12✔
488
        c.setProcessName("syncing stream with applicationId %s", appId)
12✔
489
        c.logger.Debugf("syncing stream with applicationId %s", appId)
12✔
490

12✔
491
        listOptions := metav1.ListOptions{
12✔
492
                LabelSelector: c.labelsSet(false).String(),
12✔
493
        }
12✔
494
        streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
12✔
495
        if err != nil {
12✔
UNCOV
496
                return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
×
UNCOV
497
        }
×
498

499
        streamExists := false
12✔
500
        for _, stream := range streams.Items {
21✔
501
                if stream.Spec.ApplicationId != appId {
9✔
UNCOV
502
                        continue
×
503
                }
504
                streamExists = true
9✔
505
                desiredStreams := c.generateFabricEventStream(appId)
9✔
506
                if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
11✔
507
                        c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
2✔
508
                        stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
2✔
509
                        c.setProcessName("updating event streams with applicationId %s", appId)
2✔
510
                        stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
2✔
511
                        if err != nil {
2✔
UNCOV
512
                                return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
×
UNCOV
513
                        }
×
514
                        c.Streams[appId] = stream
2✔
515
                }
516
                if match, reason := c.compareStreams(&stream, desiredStreams); !match {
14✔
517
                        c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
5✔
518
                        // make sure to keep the old name with randomly generated suffix
5✔
519
                        desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name
5✔
520
                        updatedStream, err := c.updateStreams(desiredStreams)
5✔
521
                        if err != nil {
5✔
UNCOV
522
                                return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
×
UNCOV
523
                        }
×
524
                        c.Streams[appId] = updatedStream
5✔
525
                        c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
5✔
526
                }
527
                break
9✔
528
        }
529

530
        if !streamExists {
15✔
531
                c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
3✔
532
                createdStream, err := c.createStreams(appId)
3✔
533
                if err != nil {
3✔
UNCOV
534
                        return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err)
×
UNCOV
535
                }
×
536
                c.logger.Infof("event streams %q have been successfully created", createdStream.Name)
3✔
537
                c.Streams[appId] = createdStream
3✔
538
        }
539

540
        return nil
12✔
541
}
542

543
func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) {
25✔
544
        reasons := make([]string, 0)
25✔
545
        desiredAnnotations := make(map[string]string)
25✔
546
        match = true
25✔
547

25✔
548
        // stream operator can add extra annotations so incl. current annotations in desired annotations
25✔
549
        for curKey, curValue := range curEventStreams.Annotations {
44✔
550
                if _, exists := desiredAnnotations[curKey]; !exists {
38✔
551
                        desiredAnnotations[curKey] = curValue
19✔
552
                }
19✔
553
        }
554
        // add/or override annotations if cpu and memory values were changed
555
        for newKey, newValue := range newEventStreams.Annotations {
46✔
556
                desiredAnnotations[newKey] = newValue
21✔
557
        }
21✔
558
        if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed {
28✔
559
                match = false
3✔
560
                reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
3✔
561
        }
3✔
562

563
        if !reflect.DeepEqual(curEventStreams.ObjectMeta.Labels, newEventStreams.ObjectMeta.Labels) {
28✔
564
                match = false
3✔
565
                reasons = append(reasons, "new streams labels do not match the current ones")
3✔
566
        }
3✔
567

568
        if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
33✔
569
                match = false
8✔
570
                reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
8✔
571
        }
8✔
572

573
        return match, strings.Join(reasons, ", ")
25✔
574
}
575

576
func sameEventStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
25✔
577
        if len(newEventStreams) != len(curEventStreams) {
28✔
578
                return false, "number of defined streams is different"
3✔
579
        }
3✔
580

581
        for _, newStream := range newEventStreams {
72✔
582
                match = false
50✔
583
                reason = "event stream specs differ"
50✔
584
                for _, curStream := range curEventStreams {
149✔
585
                        if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) &&
99✔
586
                                reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) &&
99✔
587
                                reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) &&
99✔
588
                                reflect.DeepEqual(newStream.EventStreamRecovery, curStream.EventStreamRecovery) {
144✔
589
                                match = true
45✔
590
                                break
45✔
591
                        }
592
                }
593
                if !match {
55✔
594
                        return false, reason
5✔
595
                }
5✔
596
        }
597

598
        return true, ""
17✔
599
}
600

601
func (c *Cluster) cleanupRemovedStreams(appIds []string) error {
1✔
602
        errors := make([]string, 0)
1✔
603
        for appId := range c.Streams {
2✔
604
                if !util.SliceContains(appIds, appId) {
2✔
605
                        c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", appId)
1✔
606
                        err := c.deleteStream(appId)
1✔
607
                        if err != nil {
1✔
UNCOV
608
                                errors = append(errors, fmt.Sprintf("failed deleting event streams with applicationId %s: %v", appId, err))
×
UNCOV
609
                        }
×
610
                }
611
        }
612

613
        if len(errors) > 0 {
1✔
UNCOV
614
                return fmt.Errorf("could not delete all removed event streams: %v", strings.Join(errors, `', '`))
×
UNCOV
615
        }
×
616

617
        return nil
1✔
618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc