• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 12583378746

02 Jan 2025 01:44PM UTC coverage: 55.135%. First build
12583378746

Pull #4829

github

web-flow
Merge 88258024f into 4cd5e165e
Pull Request #4829: Add NATS publisher support to reminder

17 of 32 new or added lines in 4 files covered. (53.13%)

16980 of 30797 relevant lines covered (55.14%)

38.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.0
/internal/events/nats/natschannel.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
// Package nats provides a nats+cloudevents implementation of the eventer interface
5
package nats
6

7
import (
8
        "context"
9
        "encoding/json"
10
        "fmt"
11
        "strings"
12
        "sync"
13

14
        "github.com/ThreeDotsLabs/watermill/message"
15
        ce_observability "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
16
        cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
17
        cloudevents "github.com/cloudevents/sdk-go/v2"
18
        "github.com/cloudevents/sdk-go/v2/client"
19
        "github.com/nats-io/nats.go"
20
        "github.com/nats-io/nats.go/jetstream"
21
        "github.com/rs/zerolog"
22

23
        "github.com/mindersec/minder/internal/events/common"
24
        serverconfig "github.com/mindersec/minder/pkg/config/server"
25
)
26

27
// BuildNatsChannelDriver creates a new event driver using
28
// CloudEvents with the NATS-JetStream transport
29
func BuildNatsChannelDriver(cfg *serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
2✔
30
        adapter := &cloudEventsNatsAdapter{cfg: &cfg.Nats}
2✔
31
        return adapter, adapter, func() {}, nil
4✔
32
}
33

34
// CloudEventsNatsPublisher actually consumes a _set_ of NATS topics,
35
// because CloudEvents-Jetstream has a separate Consumer for each topic
36
type cloudEventsNatsAdapter struct {
37
        cfg  *serverconfig.NatsConfig
38
        lock sync.Mutex
39
        // Keep a cache of the topics we subscribe/publish to
40
        topics map[string]topicState
41
}
42

43
type topicState struct {
44
        ceProtocol cejsm.Protocol
45
        ceClient   cloudevents.Client
46
}
47

48
var _ message.Subscriber = (*cloudEventsNatsAdapter)(nil)
49

50
var _ message.Publisher = (*cloudEventsNatsAdapter)(nil)
51

52
// Close implements message.Subscriber and message.Publisher.
53
func (c *cloudEventsNatsAdapter) Close() error {
1✔
54
        zerolog.Ctx(context.Background()).Info().Msg("Closing NATS event driver")
1✔
55
        c.lock.Lock()
1✔
56
        defer c.lock.Unlock()
1✔
57
        // We want to try to close all the consumers, but some may fail
1✔
58
        var gotErr error
1✔
59
        for topic, state := range c.topics {
2✔
60
                err := state.ceProtocol.Close(context.Background())
1✔
61
                if err != nil {
1✔
62
                        gotErr = err
×
63
                } else {
1✔
64
                        delete(c.topics, topic)
1✔
65
                }
1✔
66
        }
67
        return gotErr
1✔
68
}
69

70
// Ensure that we have a consumer for this topic
71
func (c *cloudEventsNatsAdapter) ensureTopic(ctx context.Context, topic string, queue string) (*topicState, error) {
6✔
72
        c.lock.Lock()
6✔
73
        defer c.lock.Unlock()
6✔
74

6✔
75
        if c.topics == nil {
8✔
76
                c.topics = make(map[string]topicState)
2✔
77
        }
2✔
78

79
        state, ok := c.topics[topic]
6✔
80
        if ok {
9✔
81
                return &state, nil
3✔
82
        }
3✔
83
        opts := []nats.Option{
3✔
84
                nats.Name("minder"),
3✔
85
                // TODO: set TLS config
3✔
86
                // TODO: set UserJWT
3✔
87
        }
3✔
88
        jetstreamOpts := []nats.JSOpt{}
3✔
89
        subOpts := []nats.SubOpt{}
3✔
90
        // Pre-create the Stream to work around the SDK creating it as "stream.*" rather then "stream.>"
3✔
91
        // We can remove this after https://github.com/cloudevents/sdk-go/pull/1084 is merged and released
3✔
92
        if err := c.ensureStream(ctx); err != nil {
3✔
93
                return nil, err
×
94
        }
×
95

96
        consumer, err := cejsm.NewProtocol(
3✔
97
                c.cfg.URL, c.cfg.Prefix, topic, topic,
3✔
98
                opts, jetstreamOpts, subOpts,
3✔
99
                cejsm.WithConsumerOptions(cejsm.WithQueueSubscriber(queue)))
3✔
100
        if err != nil {
3✔
101
                return nil, err
×
102
        }
×
103

104
        ceSub, err := cloudevents.NewClient(consumer,
3✔
105
                client.WithObservabilityService(ce_observability.NewOTelObservabilityService()))
3✔
106
        if err != nil {
3✔
107
                _ = consumer.Close(ctx)
×
108
                return nil, err
×
109
        }
×
110
        state = topicState{
3✔
111
                ceProtocol: *consumer,
3✔
112
                ceClient:   ceSub,
3✔
113
        }
3✔
114
        c.topics[topic] = state
3✔
115
        return &state, nil
3✔
116
}
117

118
func (c *cloudEventsNatsAdapter) ensureStream(ctx context.Context) error {
3✔
119
        conn, err := nats.Connect(c.cfg.URL)
3✔
120
        if err != nil {
3✔
121
                return err
×
122
        }
×
123
        defer conn.Close()
3✔
124
        js, err := jetstream.New(conn)
3✔
125
        if err != nil {
3✔
126
                return err
×
127
        }
×
128
        _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
3✔
129
                Name:     c.cfg.Prefix,
3✔
130
                Subjects: []string{c.cfg.Prefix + ".>"},
3✔
131
        })
3✔
132
        return err
3✔
133
}
134

135
// Subscribe implements message.Subscriber.
136
func (c *cloudEventsNatsAdapter) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
2✔
137
        subject := fmt.Sprintf("%s.%s", c.cfg.Prefix, topic)
2✔
138
        // consumer names cannot contain "."
2✔
139
        queueConsumer := strings.ReplaceAll(subject, ".", "-")
2✔
140

2✔
141
        // TODO: should we have separate maps for producer & consumer?  I've lazily combined them here.
2✔
142
        state, err := c.ensureTopic(ctx, subject, queueConsumer)
2✔
143
        if err != nil {
2✔
144
                return nil, err
×
145
        }
×
146

147
        out := make(chan *message.Message)
2✔
148
        go func() {
4✔
149
                err = state.ceClient.StartReceiver(ctx, convertCloudEventToMessage(out))
2✔
150
                if err != nil {
2✔
151
                        zerolog.Ctx(ctx).Error().Err(err).Str("topic", subject).Msg("Error subscribing to topic")
×
152
                }
×
153
        }()
154
        return out, err
2✔
155
}
156

157
func convertCloudEventToMessage(outChan chan *message.Message) func(ctx context.Context, event cloudevents.Event) error {
2✔
158
        return func(ctx context.Context, event cloudevents.Event) error {
6✔
159
                msg := message.NewMessage(event.ID(), event.Data())
4✔
160
                msg.SetContext(ctx)
4✔
161
                // Add some extra message metadata from the CloudEvent
4✔
162
                msg.Metadata.Set("ce-id", event.ID())
4✔
163
                msg.Metadata.Set("ce-source", event.Source())
4✔
164
                msg.Metadata.Set("ce-type", event.Type())
4✔
165
                msg.Metadata.Set("ce-subject", event.Subject())
4✔
166
                msg.Metadata.Set("ce-time", event.Time().String())
4✔
167
                msg.Metadata.Set("ce-datacontenttype", event.DataContentType())
4✔
168
                msg.Metadata.Set("ce-schemaurl", event.DataSchema())
4✔
169

4✔
170
                for k, v := range event.Extensions() {
5✔
171
                        // Strip "minder" prefix from metadata keys if present
1✔
172
                        // The prefix avoids collision on keys like "type"
1✔
173
                        k = strings.TrimPrefix(k, "minder")
1✔
174
                        // Undo the transformation from 228 in sendEvent
1✔
175
                        k = strings.ReplaceAll(k, "0", "_")
1✔
176
                        msg.Metadata.Set(k, fmt.Sprintf("%s", v))
1✔
177
                }
1✔
178

179
                outChan <- msg
4✔
180
                return nil
4✔
181
        }
182
}
183

184
// Publish implements message.Publisher.
185
func (c *cloudEventsNatsAdapter) Publish(topic string, messages ...*message.Message) error {
4✔
186
        ctx := context.Background()
4✔
187
        subject := fmt.Sprintf("%s.%s", c.cfg.Prefix, topic)
4✔
188

4✔
189
        state, err := c.ensureTopic(ctx, subject, "sender")
4✔
190
        if err != nil {
4✔
NEW
191
                return fmt.Errorf("error creating topic %q: %w", subject, err)
×
192
        }
×
193

194
        for _, msg := range messages {
8✔
195
                err := sendEvent(ctx, subject, state.ceClient, msg)
4✔
196
                if err != nil {
4✔
NEW
197
                        return fmt.Errorf("error sending event to %q: %w", subject, err)
×
198
                }
×
199
        }
200
        return nil
4✔
201
}
202

203
func sendEvent(
204
        ctx context.Context, eventType string, ceClient cloudevents.Client, msg *message.Message) error {
4✔
205
        event := cloudevents.NewEvent()
4✔
206
        event.SetID(msg.UUID)
4✔
207
        event.SetType(eventType)
4✔
208
        event.SetSource("minder") // The system which generated the event.  The Minder URL would be nice here.
4✔
209
        event.SetSubject("TODO")  // This *should* represent the entity, but we don't have a standard field for it yet.
4✔
210

4✔
211
        // All our current payloads are encoded JSON; we need to unmarshal
4✔
212
        payload := map[string]any{}
4✔
213
        if err := json.Unmarshal(msg.Payload, &payload); err != nil {
4✔
NEW
214
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
215
        }
×
216

217
        err := event.SetData("application/json", payload)
4✔
218
        if err != nil {
4✔
219
                return err
×
220
        }
×
221
        for k, v := range msg.Metadata {
5✔
222
                // CloudEvents does not allow "_" or "-" in attribute keys, only A-Z, a-z, 0-9.
1✔
223
                ceKey := strings.ReplaceAll(k, "_", "0")
1✔
224
                event.SetExtension("minder"+ceKey, v)
1✔
225
        }
1✔
226

227
        return ceClient.Send(ctx, event)
4✔
228
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc